Line data Source code
1 : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : // Package colblk implements a columnar block format.
6 : //
7 : // The columnar block format organizes data into columns. The number of columns
8 : // and their data types are dependent on the use and are configurable.
9 : //
10 : // # Format
11 : //
12 : // Every columnar block begins with a header describing the structure and schema
13 : // of the block. Then columns values are encoded in sequence. The block ends
14 : // with a single padding byte.
15 : //
16 : // The block header begins with:
17 : // - Version number (1 byte)
18 : // - The number of columns in the block (2 bytes)
19 : // - The number of rows in the block (4 bytes)
20 : //
21 : // Then follows a column-header for each column. Each column header encodes the
22 : // data type (1 byte) and the offset into the block where the column data begins
23 : // (4 bytes).
24 : //
25 : // +-----------+
26 : // | Vers (1B) |
27 : // +-------------------+--------------------------------+
28 : // | # columns (2B) | # of rows (4B) |
29 : // +-----------+-------+---------------------+----------+
30 : // | Type (1B) | Page offset (4B) | Col 0
31 : // +-----------+---------------------------------+
32 : // | Type (1B) | Page offset (4B) | Col 1
33 : // +-----------+---------------------------------+
34 : // | ... | ... |
35 : // +-----------+---------------------------------+
36 : // | Type (1B) | Page offset (4B) | Col n-1
37 : // +-----------+----------------------------------
38 : // | column 0 data ...
39 : // +----------------------------------------------
40 : // | column 1 data ...
41 : // +----------------------------------------------
42 : // | ...
43 : // +----------------------------------------------
44 : // | column n-1 data ...
45 : // +-------------+--------------------------------
46 : // | Unused (1B) |
47 : // +-------------+
48 : //
49 : // The encoding of column data itself is dependent on the data type.
50 : //
51 : // The trailing padding byte exists to allow columns to represent the end of
52 : // column data using a pointer to the byte after the end of the column. The
53 : // padding byte ensures that the pointer will always fall within the allocated
54 : // memory. Without the padding byte, such a pointer would be invalid according
55 : // to Go's pointer rules.
56 : //
57 : // # Alignment
58 : //
59 : // Block buffers are required to be word-aligned, during encoding and decoding.
60 : // This ensures that if any individual column or piece of data requires
61 : // word-alignment, the writer can align the offset into the block buffer
62 : // appropriately to ensure that the data is word-aligned.
63 : //
64 : // # Keyspan blocks
65 : //
66 : // Blocks encoding key spans (range deletions, range keys) decompose the fields
67 : // of keyspan.Key into columns. Key spans are always stored fragmented, such
68 : // that all overlapping keys have identical bounds. When encoding key spans to a
69 : // columnar block, we take advantage of this fragmentation to store the set of
70 : // unique user key span boundaries in a separate column. This column does not
71 : // have the same number of rows as the other columns. Each individual fragment
72 : // stores the index of its start boundary user key within the user key column.
73 : //
74 : // For example, consider the three unfragmented spans:
75 : //
76 : // a-e:{(#0,RANGEDEL)}
77 : // b-d:{(#100,RANGEDEL)}
78 : // f-g:{(#20,RANGEDEL)}
79 : //
80 : // Once fragmented, these spans become five keyspan.Keys:
81 : //
82 : // a-b:{(#0,RANGEDEL)}
83 : // b-d:{(#100,RANGEDEL), (#0,RANGEDEL)}
84 : // d-e:{(#0,RANGEDEL)}
85 : // f-g:{(#20,RANGEDEL)}
86 : //
87 : // When encoded within a columnar keyspan block, the boundary columns (user key
88 : // and start indices) would hold six rows:
89 : //
90 : // +-----------------+-----------------+
91 : // | User key | Start index |
92 : // +-----------------+-----------------+
93 : // | a | 0 |
94 : // +-----------------+-----------------+
95 : // | b | 1 |
96 : // +-----------------+-----------------+
97 : // | d | 3 |
98 : // +-----------------+-----------------+
99 : // | e | 4 |
100 : // +-----------------+-----------------+
101 : // | f | 4 |
102 : // +-----------------+-----------------+
103 : // | g | 5 |
104 : // +-----------------+-----------------+
105 : //
106 : // The remaining keyspan.Key columns would look like:
107 : //
108 : // +-----------------+-----------------+-----------------+
109 : // | Trailer | Suffix | Value |
110 : // +-----------------+-----------------+-----------------+
111 : // | (#0,RANGEDEL) | - | - | (0)
112 : // +-----------------+-----------------+-----------------+
113 : // | (#100,RANGEDEL) | - | - | (1)
114 : // +-----------------+-----------------+-----------------+
115 : // | (#0,RANGEDEL) | - | - | (2)
116 : // +-----------------+-----------------+-----------------+
117 : // | (#0,RANGEDEL) | - | - | (3)
118 : // +-----------------+-----------------+-----------------+
119 : // | (#20,RANGEDEL) | - | - | (4)
120 : // +-----------------+-----------------+-----------------+
121 : //
122 : // This encoding does not explicitly encode the mapping of keyspan.Key to
123 : // boundary keys. Rather each boundary key encodes the index where keys
124 : // beginning at a boundary >= the key begin. Readers look up the key start index
125 : // for the start boundary (s) and the end boundary (e). Any keys within indexes
126 : // [s,e) have the corresponding bounds.
127 : //
128 : // Both range deletions and range keys are encoded with the same schema. Range
129 : // deletion keyspan.Keys never contain suffixes or values. When one of these
130 : // columns is encoded, the RawBytes encoding uses uintEncodingAllZero to avoid
131 : // encoding N offsets. Each of these empty columns is encoded in just 1 byte of
132 : // column data.
133 : package colblk
134 :
135 : import (
136 : "cmp"
137 : "encoding/binary"
138 : "fmt"
139 : "unsafe"
140 :
141 : "github.com/cockroachdb/crlib/crbytes"
142 : "github.com/cockroachdb/errors"
143 : "github.com/cockroachdb/pebble/internal/binfmt"
144 : "github.com/cockroachdb/pebble/internal/treeprinter"
145 : )
146 :
147 : // Version indicates the version of the columnar block format encoded. The
148 : // version byte is always the first byte within the block. This ensures that
149 : // readers can switch on the version before reading the rest of the block.
150 : type Version uint8
151 :
152 : const (
153 : // Version1 is the first version of the columnar block format.
154 : Version1 Version = 0x01
155 : )
156 :
157 : const blockHeaderBaseSize = 7
158 : const columnHeaderSize = 5
159 : const maxBlockRetainedSize = 256 << 10
160 :
161 : // Header holds the metadata extracted from a columnar block's header.
162 : type Header struct {
163 : Version Version
164 : // Columns holds the number of columns encoded within the block.
165 : Columns uint16
166 : // Rows holds the number of rows encoded within the block.
167 : Rows uint32
168 : }
169 :
170 : // String implements the fmt.Stringer interface, returning a human-readable
171 : // representation of the block header.
172 0 : func (h Header) String() string {
173 0 : return fmt.Sprintf("Version=%v; Columns=%d; Rows=%d", h.Version, h.Columns, h.Rows)
174 0 : }
175 :
176 : // Encode encodes the header to the provided buf. The length of buf must be at
177 : // least 7 bytes.
178 1 : func (h Header) Encode(buf []byte) {
179 1 : buf[0] = byte(h.Version)
180 1 : binary.LittleEndian.PutUint16(buf[1:], h.Columns)
181 1 : binary.LittleEndian.PutUint32(buf[1+align16:], h.Rows)
182 1 : }
183 :
184 : // blockHeaderSize returns the size of the block header, including column
185 : // headers, for a block with the specified number of columns and optionally a
186 : // custom header size.
187 1 : func blockHeaderSize(cols int, customHeaderSize uint32) uint32 {
188 1 : // Each column has a 1-byte DataType and a 4-byte offset into the block.
189 1 : return uint32(blockHeaderBaseSize+cols*columnHeaderSize) + customHeaderSize
190 1 : }
191 :
192 : // DecodeHeader reads the block header from the provided serialized columnar
193 : // block.
194 1 : func DecodeHeader(data []byte) Header {
195 1 : return Header{
196 1 : Version: Version(data[0]),
197 1 : Columns: uint16(binary.LittleEndian.Uint16(data[1:])),
198 1 : Rows: uint32(binary.LittleEndian.Uint32(data[1+align16:])),
199 1 : }
200 1 : }
201 :
202 : // A blockEncoder encodes a columnar block and handles encoding the block's
203 : // header, including individual column headers.
204 : type blockEncoder struct {
205 : buf []byte
206 : headerOffset uint32
207 : pageOffset uint32
208 : }
209 :
210 1 : func (e *blockEncoder) reset() {
211 1 : if cap(e.buf) > maxBlockRetainedSize {
212 0 : e.buf = nil
213 0 : }
214 1 : e.headerOffset = 0
215 1 : e.pageOffset = 0
216 : }
217 :
218 : // init initializes the block encoder with a buffer of the specified size and
219 : // header.
220 1 : func (e *blockEncoder) init(size int, h Header, customHeaderSize uint32) {
221 1 : if cap(e.buf) < size {
222 1 : e.buf = crbytes.AllocAligned(size)
223 1 : } else {
224 1 : e.buf = e.buf[:size]
225 1 : }
226 1 : e.headerOffset = uint32(customHeaderSize) + blockHeaderBaseSize
227 1 : e.pageOffset = blockHeaderSize(int(h.Columns), customHeaderSize)
228 1 : h.Encode(e.buf[customHeaderSize:])
229 : }
230 :
231 : // data returns the underlying buffer.
232 1 : func (e *blockEncoder) data() []byte {
233 1 : return e.buf
234 1 : }
235 :
236 : // encode writes w's columns to the block.
237 1 : func (e *blockEncoder) encode(rows int, w ColumnWriter) {
238 1 : for i := 0; i < w.NumColumns(); i++ {
239 1 : e.buf[e.headerOffset] = byte(w.DataType(i))
240 1 : binary.LittleEndian.PutUint32(e.buf[e.headerOffset+1:], e.pageOffset)
241 1 : e.headerOffset += columnHeaderSize
242 1 : e.pageOffset = w.Finish(i, rows, e.pageOffset, e.buf)
243 1 : }
244 : }
245 :
246 : // finish finalizes the block encoding, returning the encoded block. The
247 : // returned byte slice points to the encoder's buffer, so if the encoder is
248 : // reused the returned slice will be invalidated.
249 1 : func (e *blockEncoder) finish() []byte {
250 1 : e.buf[e.pageOffset] = 0x00 // Padding byte
251 1 : e.pageOffset++
252 1 : if e.pageOffset != uint32(len(e.buf)) {
253 0 : panic(errors.AssertionFailedf("expected pageOffset=%d to equal size=%d", e.pageOffset, len(e.buf)))
254 : }
255 1 : return e.buf
256 : }
257 :
258 : // FinishBlock writes the columnar block to a heap-allocated byte slice.
259 : // FinishBlock assumes all columns have the same number of rows. If that's not
260 : // the case, the caller should manually construct their own block.
261 0 : func FinishBlock(rows int, writers []ColumnWriter) []byte {
262 0 : size := blockHeaderSize(len(writers), 0)
263 0 : nCols := uint16(0)
264 0 : for _, cw := range writers {
265 0 : size = cw.Size(rows, size)
266 0 : nCols += uint16(cw.NumColumns())
267 0 : }
268 0 : size++ // +1 for the trailing version byte.
269 0 :
270 0 : var enc blockEncoder
271 0 : enc.init(int(size), Header{
272 0 : Version: Version1,
273 0 : Columns: nCols,
274 0 : Rows: uint32(rows),
275 0 : }, 0)
276 0 : for _, cw := range writers {
277 0 : enc.encode(rows, cw)
278 0 : }
279 0 : return enc.finish()
280 : }
281 :
282 : // DecodeColumn decodes the col'th column of the provided reader's block as a
283 : // column of dataType using decodeFunc.
284 : func DecodeColumn[V any](
285 : d *BlockDecoder, col int, rows int, dataType DataType, decodeFunc DecodeFunc[V],
286 1 : ) V {
287 1 : if uint16(col) >= d.header.Columns {
288 0 : panic(errors.AssertionFailedf("column %d is out of range [0, %d)", col, d.header.Columns))
289 : }
290 1 : if dt := d.dataType(col); dt != dataType {
291 0 : panic(errors.AssertionFailedf("column %d is type %s; not %s", col, dt, dataType))
292 : }
293 1 : v, endOffset := decodeFunc(d.data, d.pageStart(col), rows)
294 1 : if nextColumnOff := d.pageStart(col + 1); endOffset != nextColumnOff {
295 0 : panic(errors.AssertionFailedf("column %d decoded to offset %d; expected %d", col, endOffset, nextColumnOff))
296 : }
297 1 : return v
298 : }
299 :
300 : // A BlockDecoder holds metadata for accessing the columns of a columnar block.
301 : type BlockDecoder struct {
302 : data []byte
303 : header Header
304 : customHeaderSize uint32
305 : }
306 :
307 : // DecodeBlock decodes the header of the provided columnar block and returns a
308 : // new BlockDecoder configured to read from the block. The caller must ensure
309 : // that the data is formatted as to the block layout specification.
310 0 : func DecodeBlock(data []byte, customHeaderSize uint32) BlockDecoder {
311 0 : d := BlockDecoder{}
312 0 : d.Init(data, customHeaderSize)
313 0 : return d
314 0 : }
315 :
316 : // Init initializes a BlockDecoder with the data contained in the provided block.
317 1 : func (d *BlockDecoder) Init(data []byte, customHeaderSize uint32) {
318 1 : *d = BlockDecoder{
319 1 : data: data,
320 1 : header: DecodeHeader(data[customHeaderSize:]),
321 1 : customHeaderSize: customHeaderSize,
322 1 : }
323 1 : }
324 :
325 : // Rows returns the number of rows in the block, as indicated by the block header.
326 0 : func (d *BlockDecoder) Rows() int {
327 0 : return int(d.header.Rows)
328 0 : }
329 :
330 : // DataType returns the data type of the col'th column. Every column's data type
331 : // is encoded within the block header.
332 0 : func (d *BlockDecoder) DataType(col int) DataType {
333 0 : if uint16(col) >= d.header.Columns {
334 0 : panic(errors.AssertionFailedf("column %d is out of range [0, %d)", col, d.header.Columns))
335 : }
336 0 : return d.dataType(col)
337 : }
338 :
339 1 : func (d *BlockDecoder) dataType(col int) DataType {
340 1 : return DataType(*(*uint8)(d.pointer(d.customHeaderSize + blockHeaderBaseSize + columnHeaderSize*uint32(col))))
341 1 : }
342 :
343 : // Bitmap retrieves the col'th column as a bitmap. The column must be of type
344 : // DataTypeBool.
345 1 : func (d *BlockDecoder) Bitmap(col int) Bitmap {
346 1 : return DecodeColumn(d, col, int(d.header.Rows), DataTypeBool, DecodeBitmap)
347 1 : }
348 :
349 : // RawBytes retrieves the col'th column as a column of byte slices. The column
350 : // must be of type DataTypeBytes.
351 1 : func (d *BlockDecoder) RawBytes(col int) RawBytes {
352 1 : return DecodeColumn(d, col, int(d.header.Rows), DataTypeBytes, DecodeRawBytes)
353 1 : }
354 :
355 : // PrefixBytes retrieves the col'th column as a prefix-compressed byte slice column. The column
356 : // must be of type DataTypePrefixBytes.
357 1 : func (d *BlockDecoder) PrefixBytes(col int) PrefixBytes {
358 1 : return DecodeColumn(d, col, int(d.header.Rows), DataTypePrefixBytes, DecodePrefixBytes)
359 1 : }
360 :
361 : // Uints retrieves the col'th column as a column of uints. The column must be
362 : // of type DataTypeUint.
363 1 : func (d *BlockDecoder) Uints(col int) UnsafeUints {
364 1 : return DecodeColumn(d, col, int(d.header.Rows), DataTypeUint, DecodeUnsafeUints)
365 1 : }
366 :
367 1 : func (d *BlockDecoder) pageStart(col int) uint32 {
368 1 : if uint16(col) >= d.header.Columns {
369 1 : // -1 for the trailing version byte
370 1 : return uint32(len(d.data) - 1)
371 1 : }
372 1 : return binary.LittleEndian.Uint32(
373 1 : unsafe.Slice((*byte)(d.pointer(d.customHeaderSize+uint32(blockHeaderBaseSize+columnHeaderSize*col+1))), 4))
374 : }
375 :
376 1 : func (d *BlockDecoder) pointer(offset uint32) unsafe.Pointer {
377 1 : return unsafe.Pointer(uintptr(unsafe.Pointer(&d.data[0])) + uintptr(offset))
378 1 : }
379 :
380 : // FormattedString returns a formatted representation of the block's binary
381 : // data.
382 0 : func (d *BlockDecoder) FormattedString() string {
383 0 : f := binfmt.New(d.data)
384 0 : tp := treeprinter.New()
385 0 : n := tp.Child("block")
386 0 : d.headerToBinFormatter(f, n)
387 0 : for i := 0; i < int(d.header.Columns); i++ {
388 0 : d.columnToBinFormatter(f, n, i, int(d.header.Rows))
389 0 : }
390 0 : f.HexBytesln(1, "block trailer padding")
391 0 : f.ToTreePrinter(n)
392 0 : return tp.String()
393 : }
394 :
395 0 : func (d *BlockDecoder) headerToBinFormatter(f *binfmt.Formatter, tp treeprinter.Node) {
396 0 : f.HexBytesln(1, "version %v", Version(f.PeekUint(1)))
397 0 : f.HexBytesln(2, "%d columns", d.header.Columns)
398 0 : f.HexBytesln(4, "%d rows", d.header.Rows)
399 0 : for i := 0; i < int(d.header.Columns); i++ {
400 0 : f.Byte("col %d: %s", i, d.DataType(i))
401 0 : f.HexBytesln(4, "col %d: page start %d", i, d.pageStart(i))
402 0 : }
403 0 : f.ToTreePrinter(tp.Child("columnar block header"))
404 : }
405 :
406 : func (d *BlockDecoder) formatColumn(
407 : f *binfmt.Formatter,
408 : tp treeprinter.Node,
409 : col int,
410 : fn func(*binfmt.Formatter, treeprinter.Node, DataType),
411 0 : ) {
412 0 : dataType := d.DataType(col)
413 0 : colSize := d.pageStart(col+1) - d.pageStart(col)
414 0 : endOff := f.Offset() + int(colSize)
415 0 : fn(f, tp, dataType)
416 0 :
417 0 : // We expect formatting the column data to have consumed all the bytes
418 0 : // between the column's pageOffset and the next column's pageOffset.
419 0 : switch v := endOff - f.Offset(); cmp.Compare[int](v, 0) {
420 0 : case +1:
421 0 : panic(fmt.Sprintf("expected f.Offset() = %d, but found %d; did column %s format too few bytes?", endOff, f.Offset(), dataType))
422 0 : case 0:
423 0 : case -1:
424 0 : panic(fmt.Sprintf("expected f.Offset() = %d, but found %d; did column %s format too many bytes?", endOff, f.Offset(), dataType))
425 : }
426 : }
427 :
428 : func (d *BlockDecoder) columnToBinFormatter(
429 : f *binfmt.Formatter, tp treeprinter.Node, col, rows int,
430 0 : ) {
431 0 : d.formatColumn(f, tp, col, func(f *binfmt.Formatter, tp treeprinter.Node, dataType DataType) {
432 0 : n := tp.Childf("data for column %d (%s)", col, dataType)
433 0 : switch dataType {
434 0 : case DataTypeBool:
435 0 : bitmapToBinFormatter(f, n, rows)
436 0 : case DataTypeUint:
437 0 : uintsToBinFormatter(f, n, rows, nil)
438 0 : case DataTypePrefixBytes:
439 0 : prefixBytesToBinFormatter(f, n, rows, nil)
440 0 : case DataTypeBytes:
441 0 : rawBytesToBinFormatter(f, n, rows, nil)
442 0 : default:
443 0 : panic("unimplemented")
444 : }
445 : })
446 :
447 : }
|