Line data Source code
1 : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package valblk
6 :
7 : import (
8 : "encoding/binary"
9 : "math/rand/v2"
10 : "sync"
11 :
12 : "github.com/cockroachdb/errors"
13 : "github.com/cockroachdb/pebble/internal/invariants"
14 : "github.com/cockroachdb/pebble/sstable/block"
15 : )
16 :
17 : // Writer writes a sequence of value blocks, and the value blocks index, for a
18 : // sstable.
19 : type Writer struct {
20 : flush block.FlushGovernor
21 : // Configured compression.
22 : compression block.Compression
23 : // checksummer with configured checksum type.
24 : checksummer block.Checksummer
25 : // Block finished callback.
26 : blockFinishedFunc func(compressedSize int)
27 :
28 : // buf is the current block being written to (uncompressed).
29 : buf *blockBuffer
30 : // compressedBuf is used for compressing the block.
31 : compressedBuf *blockBuffer
32 : // Sequence of blocks that are finished.
33 : blocks []bufferedValueBlock
34 : // Cumulative value block bytes written so far.
35 : totalBlockBytes uint64
36 : numValues uint64
37 : }
38 :
39 : type bufferedValueBlock struct {
40 : block block.PhysicalBlock
41 : buffer *blockBuffer
42 : handle block.Handle
43 : }
44 :
45 : type blockBuffer struct {
46 : b []byte
47 : }
48 :
49 : // Pool of block buffers that should be roughly the blockSize.
50 : var uncompressedValueBlockBufPool = sync.Pool{
51 1 : New: func() interface{} {
52 1 : return &blockBuffer{}
53 1 : },
54 : }
55 :
56 : // Pool of block buffers for compressed value blocks. These may widely vary in
57 : // size based on compression ratios.
58 : var compressedValueBlockBufPool = sync.Pool{
59 1 : New: func() interface{} {
60 1 : return &blockBuffer{}
61 1 : },
62 : }
63 :
64 1 : func releaseToValueBlockBufPool(pool *sync.Pool, b *blockBuffer) {
65 1 : // Don't pool buffers larger than 128KB, in case we had some rare large
66 1 : // values.
67 1 : if len(b.b) > 128*1024 {
68 0 : return
69 0 : }
70 1 : if invariants.Enabled {
71 1 : // Set the bytes to a random value. Cap the number of bytes being
72 1 : // randomized to prevent test timeouts.
73 1 : length := cap(b.b)
74 1 : if length > 1000 {
75 1 : length = 1000
76 1 : }
77 1 : b.b = b.b[:length:length]
78 1 : for j := range b.b {
79 1 : b.b[j] = byte(rand.Uint32())
80 1 : }
81 : }
82 1 : pool.Put(b)
83 : }
84 :
85 : var valueBlockWriterPool = sync.Pool{
86 1 : New: func() interface{} {
87 1 : return &Writer{}
88 1 : },
89 : }
90 :
91 : // NewWriter creates a new Writer of value blocks and value index blocks.
92 : func NewWriter(
93 : flushGovernor block.FlushGovernor,
94 : compression block.Compression,
95 : checksumType block.ChecksumType,
96 : // compressedSize should exclude the block trailer.
97 : blockFinishedFunc func(compressedSize int),
98 1 : ) *Writer {
99 1 : w := valueBlockWriterPool.Get().(*Writer)
100 1 : *w = Writer{
101 1 : flush: flushGovernor,
102 1 : compression: compression,
103 1 : checksummer: block.Checksummer{
104 1 : Type: checksumType,
105 1 : },
106 1 : blockFinishedFunc: blockFinishedFunc,
107 1 : buf: uncompressedValueBlockBufPool.Get().(*blockBuffer),
108 1 : compressedBuf: compressedValueBlockBufPool.Get().(*blockBuffer),
109 1 : blocks: w.blocks[:0],
110 1 : }
111 1 : w.buf.b = w.buf.b[:0]
112 1 : w.compressedBuf.b = w.compressedBuf.b[:0]
113 1 : return w
114 1 : }
115 :
116 : // AddValue adds a value to the writer, returning a Handle referring to the
117 : // stored value.
118 1 : func (w *Writer) AddValue(v []byte) (Handle, error) {
119 1 : if invariants.Enabled && len(v) == 0 {
120 0 : return Handle{}, errors.Errorf("cannot write empty value to value block")
121 0 : }
122 1 : w.numValues++
123 1 : blockLen := len(w.buf.b)
124 1 : valueLen := len(v)
125 1 : if w.flush.ShouldFlush(blockLen, blockLen+valueLen) {
126 1 : // Block is not currently empty and adding this value will become too big,
127 1 : // so finish this block.
128 1 : w.compressAndFlush()
129 1 : blockLen = len(w.buf.b)
130 1 : if invariants.Enabled && blockLen != 0 {
131 0 : panic("blockLen of new block should be 0")
132 : }
133 : }
134 1 : vh := Handle{
135 1 : ValueLen: uint32(valueLen),
136 1 : BlockNum: uint32(len(w.blocks)),
137 1 : OffsetInBlock: uint32(blockLen),
138 1 : }
139 1 : blockLen = int(vh.OffsetInBlock + vh.ValueLen)
140 1 : if cap(w.buf.b) < blockLen {
141 1 : size := 2 * cap(w.buf.b)
142 1 : if size < 1024 {
143 1 : size = 1024
144 1 : }
145 1 : for size < blockLen {
146 0 : size *= 2
147 0 : }
148 1 : buf := make([]byte, blockLen, size)
149 1 : _ = copy(buf, w.buf.b)
150 1 : w.buf.b = buf
151 1 : } else {
152 1 : w.buf.b = w.buf.b[:blockLen]
153 1 : }
154 1 : buf := w.buf.b[vh.OffsetInBlock:]
155 1 : n := copy(buf, v)
156 1 : if n != len(buf) {
157 0 : panic("incorrect length computation")
158 : }
159 1 : return vh, nil
160 : }
161 :
162 : // Size returns the total size of currently buffered value blocks.
163 1 : func (w *Writer) Size() uint64 {
164 1 : sz := w.totalBlockBytes
165 1 : if w.buf != nil {
166 1 : sz += uint64(len(w.buf.b))
167 1 : }
168 1 : return sz
169 : }
170 :
171 1 : func (w *Writer) compressAndFlush() {
172 1 : w.compressedBuf.b = w.compressedBuf.b[:cap(w.compressedBuf.b)]
173 1 : physicalBlock := block.CompressAndChecksum(&w.compressedBuf.b, w.buf.b, w.compression, &w.checksummer)
174 1 : bh := block.Handle{Offset: w.totalBlockBytes, Length: uint64(physicalBlock.LengthWithoutTrailer())}
175 1 : w.totalBlockBytes += uint64(physicalBlock.LengthWithTrailer())
176 1 : // blockFinishedFunc length excludes the block trailer.
177 1 : w.blockFinishedFunc(physicalBlock.LengthWithoutTrailer())
178 1 : b := bufferedValueBlock{
179 1 : block: physicalBlock,
180 1 : handle: bh,
181 1 : }
182 1 :
183 1 : // We'll hand off a buffer to w.blocks and get a new one. Which buffer we're
184 1 : // handing off depends on the outcome of compression.
185 1 : if physicalBlock.IsCompressed() {
186 1 : b.buffer = w.compressedBuf
187 1 : w.compressedBuf = compressedValueBlockBufPool.Get().(*blockBuffer)
188 1 : } else {
189 1 : b.buffer = w.buf
190 1 : w.buf = uncompressedValueBlockBufPool.Get().(*blockBuffer)
191 1 : }
192 1 : w.blocks = append(w.blocks, b)
193 1 : w.buf.b = w.buf.b[:0]
194 : }
195 :
196 1 : func (w *Writer) computeChecksum(blk []byte) {
197 1 : n := len(blk) - block.TrailerLen
198 1 : checksum := w.checksummer.Checksum(blk[:n], blk[n])
199 1 : binary.LittleEndian.PutUint32(blk[n+1:], checksum)
200 1 : }
201 :
202 : // Finish finishes writing the value blocks and value blocks index, writing the
203 : // buffered blocks out to the provider layout writer.
204 1 : func (w *Writer) Finish(layout LayoutWriter, fileOffset uint64) (IndexHandle, WriterStats, error) {
205 1 : if len(w.buf.b) > 0 {
206 1 : w.compressAndFlush()
207 1 : }
208 1 : n := len(w.blocks)
209 1 : if n == 0 {
210 1 : return IndexHandle{}, WriterStats{}, nil
211 1 : }
212 1 : largestOffset := uint64(0)
213 1 : largestLength := uint64(0)
214 1 : for i := range w.blocks {
215 1 : _, err := layout.WriteValueBlock(w.blocks[i].block)
216 1 : if err != nil {
217 0 : return IndexHandle{}, WriterStats{}, err
218 0 : }
219 1 : w.blocks[i].handle.Offset += fileOffset
220 1 : largestOffset = w.blocks[i].handle.Offset
221 1 : if largestLength < w.blocks[i].handle.Length {
222 1 : largestLength = w.blocks[i].handle.Length
223 1 : }
224 : }
225 1 : vbihOffset := fileOffset + w.totalBlockBytes
226 1 :
227 1 : vbih := IndexHandle{
228 1 : Handle: block.Handle{
229 1 : Offset: vbihOffset,
230 1 : },
231 1 : BlockNumByteLength: uint8(lenLittleEndian(uint64(n - 1))),
232 1 : BlockOffsetByteLength: uint8(lenLittleEndian(largestOffset)),
233 1 : BlockLengthByteLength: uint8(lenLittleEndian(largestLength)),
234 1 : }
235 1 : var err error
236 1 : if n > 0 {
237 1 : if vbih, err = w.writeValueBlocksIndex(layout, vbih); err != nil {
238 0 : return IndexHandle{}, WriterStats{}, err
239 0 : }
240 : }
241 1 : stats := WriterStats{
242 1 : NumValueBlocks: uint64(n),
243 1 : NumValuesInValueBlocks: w.numValues,
244 1 : ValueBlocksAndIndexSize: w.totalBlockBytes + vbih.Handle.Length + block.TrailerLen,
245 1 : }
246 1 : return vbih, stats, err
247 : }
248 :
249 1 : func (w *Writer) writeValueBlocksIndex(layout LayoutWriter, h IndexHandle) (IndexHandle, error) {
250 1 : blockLen :=
251 1 : int(h.BlockNumByteLength+h.BlockOffsetByteLength+h.BlockLengthByteLength) * len(w.blocks)
252 1 : h.Handle.Length = uint64(blockLen)
253 1 : blockLen += block.TrailerLen
254 1 : var buf []byte
255 1 : if cap(w.buf.b) < blockLen {
256 1 : buf = make([]byte, blockLen)
257 1 : w.buf.b = buf
258 1 : } else {
259 1 : buf = w.buf.b[:blockLen]
260 1 : }
261 1 : b := buf
262 1 : for i := range w.blocks {
263 1 : littleEndianPut(uint64(i), b, int(h.BlockNumByteLength))
264 1 : b = b[int(h.BlockNumByteLength):]
265 1 : littleEndianPut(w.blocks[i].handle.Offset, b, int(h.BlockOffsetByteLength))
266 1 : b = b[int(h.BlockOffsetByteLength):]
267 1 : littleEndianPut(w.blocks[i].handle.Length, b, int(h.BlockLengthByteLength))
268 1 : b = b[int(h.BlockLengthByteLength):]
269 1 : }
270 1 : if len(b) != block.TrailerLen {
271 0 : panic("incorrect length calculation")
272 : }
273 1 : b[0] = byte(block.NoCompressionIndicator)
274 1 : w.computeChecksum(buf)
275 1 : if _, err := layout.WriteValueIndexBlock(buf, h); err != nil {
276 0 : return IndexHandle{}, err
277 0 : }
278 1 : return h, nil
279 : }
280 :
281 : // Release relinquishes the resources held by the writer and returns the Writer
282 : // to a pool.
283 1 : func (w *Writer) Release() {
284 1 : for i := range w.blocks {
285 1 : if w.blocks[i].block.IsCompressed() {
286 1 : releaseToValueBlockBufPool(&compressedValueBlockBufPool, w.blocks[i].buffer)
287 1 : } else {
288 1 : releaseToValueBlockBufPool(&uncompressedValueBlockBufPool, w.blocks[i].buffer)
289 1 : }
290 1 : w.blocks[i].buffer = nil
291 : }
292 1 : if w.buf != nil {
293 1 : releaseToValueBlockBufPool(&uncompressedValueBlockBufPool, w.buf)
294 1 : }
295 1 : if w.compressedBuf != nil {
296 1 : releaseToValueBlockBufPool(&compressedValueBlockBufPool, w.compressedBuf)
297 1 : }
298 1 : *w = Writer{
299 1 : blocks: w.blocks[:0],
300 1 : }
301 1 : valueBlockWriterPool.Put(w)
302 : }
303 :
304 : // WriterStats contains statistics about the value blocks and value index block
305 : // written by a Writer.
306 : type WriterStats struct {
307 : NumValueBlocks uint64
308 : NumValuesInValueBlocks uint64
309 : // Includes both value blocks and value index block.
310 : ValueBlocksAndIndexSize uint64
311 : }
312 :
313 : // TODO(jackson): Refactor the Writer into an Encoder and move the onus of
314 : // calling into the sstable.layoutWriter onto the sstable.Writer.
315 :
316 : // LayoutWriter defines the interface for a writer that writes out serialized
317 : // value and value index blocks.
318 : type LayoutWriter interface {
319 : // WriteValueBlock writes a pre-finished value block (with the trailer) to
320 : // the writer.
321 : WriteValueBlock(blk block.PhysicalBlock) (block.Handle, error)
322 : // WriteValueBlockIndex writes a pre-finished value block index to the
323 : // writer.
324 : WriteValueIndexBlock(blk []byte, vbih IndexHandle) (block.Handle, error)
325 : }
|