Line data Source code
1 : // Copyright 2011 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package sstable
6 :
7 : import (
8 : "bytes"
9 : "cmp"
10 : "context"
11 : "encoding/binary"
12 : "fmt"
13 : "io"
14 : "slices"
15 : "unsafe"
16 :
17 : "github.com/cockroachdb/errors"
18 : "github.com/cockroachdb/pebble/internal/base"
19 : "github.com/cockroachdb/pebble/internal/binfmt"
20 : "github.com/cockroachdb/pebble/internal/bytealloc"
21 : "github.com/cockroachdb/pebble/internal/sstableinternal"
22 : "github.com/cockroachdb/pebble/internal/treeprinter"
23 : "github.com/cockroachdb/pebble/objstorage"
24 : "github.com/cockroachdb/pebble/sstable/block"
25 : "github.com/cockroachdb/pebble/sstable/colblk"
26 : "github.com/cockroachdb/pebble/sstable/rowblk"
27 : )
28 :
29 : // Layout describes the block organization of an sstable.
30 : type Layout struct {
31 : // NOTE: changes to fields in this struct should also be reflected in
32 : // ValidateBlockChecksums, which validates a static list of BlockHandles
33 : // referenced in this struct.
34 :
35 : Data []block.HandleWithProperties
36 : Index []block.Handle
37 : TopIndex block.Handle
38 : Filter []NamedBlockHandle
39 : RangeDel block.Handle
40 : RangeKey block.Handle
41 : ValueBlock []block.Handle
42 : ValueIndex block.Handle
43 : Properties block.Handle
44 : MetaIndex block.Handle
45 : Footer block.Handle
46 : Format TableFormat
47 : }
48 :
49 : // NamedBlockHandle holds a block.Handle and corresponding name.
50 : type NamedBlockHandle struct {
51 : block.Handle
52 : Name string
53 : }
54 :
55 : // FilterByName retrieves the block handle of the named filter, if it exists.
56 : // The provided the name should be the name as it appears in the metaindex
57 : // block.
58 1 : func (l *Layout) FilterByName(name string) (block.Handle, bool) {
59 1 : for i := range l.Filter {
60 1 : if l.Filter[i].Name == name {
61 1 : return l.Filter[i].Handle, true
62 1 : }
63 : }
64 1 : return block.Handle{}, false
65 : }
66 :
67 1 : func (l *Layout) orderedBlocks() []NamedBlockHandle {
68 1 : var blocks []NamedBlockHandle
69 1 : for i := range l.Data {
70 1 : blocks = append(blocks, NamedBlockHandle{l.Data[i].Handle, "data"})
71 1 : }
72 1 : for i := range l.Index {
73 1 : blocks = append(blocks, NamedBlockHandle{l.Index[i], "index"})
74 1 : }
75 1 : if l.TopIndex.Length != 0 {
76 1 : blocks = append(blocks, NamedBlockHandle{l.TopIndex, "top-index"})
77 1 : }
78 1 : blocks = append(blocks, l.Filter...)
79 1 : if l.RangeDel.Length != 0 {
80 1 : blocks = append(blocks, NamedBlockHandle{l.RangeDel, "range-del"})
81 1 : }
82 1 : if l.RangeKey.Length != 0 {
83 1 : blocks = append(blocks, NamedBlockHandle{l.RangeKey, "range-key"})
84 1 : }
85 1 : for i := range l.ValueBlock {
86 1 : blocks = append(blocks, NamedBlockHandle{l.ValueBlock[i], "value-block"})
87 1 : }
88 1 : if l.ValueIndex.Length != 0 {
89 1 : blocks = append(blocks, NamedBlockHandle{l.ValueIndex, "value-index"})
90 1 : }
91 1 : if l.Properties.Length != 0 {
92 1 : blocks = append(blocks, NamedBlockHandle{l.Properties, "properties"})
93 1 : }
94 1 : if l.MetaIndex.Length != 0 {
95 1 : blocks = append(blocks, NamedBlockHandle{l.MetaIndex, "meta-index"})
96 1 : }
97 1 : if l.Footer.Length != 0 {
98 1 : if l.Footer.Length == levelDBFooterLen {
99 1 : blocks = append(blocks, NamedBlockHandle{l.Footer, "leveldb-footer"})
100 1 : } else {
101 1 : blocks = append(blocks, NamedBlockHandle{l.Footer, "footer"})
102 1 : }
103 : }
104 1 : slices.SortFunc(blocks, func(a, b NamedBlockHandle) int {
105 1 : return cmp.Compare(a.Offset, b.Offset)
106 1 : })
107 1 : return blocks
108 : }
109 :
110 : // Describe returns a description of the layout. If the verbose parameter is
111 : // true, details of the structure of each block are returned as well.
112 : // If verbose is true and fmtKV is non-nil, the output includes the KVs (as formatted by this function).
113 : func (l *Layout) Describe(
114 : verbose bool, r *Reader, fmtKV func(key *base.InternalKey, value []byte) string,
115 1 : ) string {
116 1 : ctx := context.TODO()
117 1 :
118 1 : blocks := l.orderedBlocks()
119 1 : formatting := rowblkFormatting
120 1 : if l.Format.BlockColumnar() {
121 1 : formatting = colblkFormatting
122 1 : }
123 :
124 1 : tp := treeprinter.New()
125 1 : root := tp.Child("sstable")
126 1 :
127 1 : for i := range blocks {
128 1 : b := &blocks[i]
129 1 : tpNode := root.Childf("%s offset: %d length: %d", b.Name, b.Offset, b.Length)
130 1 :
131 1 : if !verbose {
132 1 : continue
133 : }
134 1 : if b.Name == "filter" {
135 0 : continue
136 : }
137 :
138 1 : if b.Name == "footer" || b.Name == "leveldb-footer" {
139 1 : trailer, offset := make([]byte, b.Length), 0
140 1 : _ = r.readable.ReadAt(ctx, trailer, int64(b.Offset))
141 1 :
142 1 : if b.Name == "footer" {
143 1 : checksumType := block.ChecksumType(trailer[0])
144 1 : tpNode.Childf("%03d checksum type: %s", offset, checksumType)
145 1 : trailer, offset = trailer[1:], offset+1
146 1 : }
147 :
148 1 : metaHandle, n := binary.Uvarint(trailer)
149 1 : metaLen, m := binary.Uvarint(trailer[n:])
150 1 : tpNode.Childf("%03d meta: offset=%d, length=%d", offset, metaHandle, metaLen)
151 1 : trailer, offset = trailer[n+m:], offset+n+m
152 1 :
153 1 : indexHandle, n := binary.Uvarint(trailer)
154 1 : indexLen, m := binary.Uvarint(trailer[n:])
155 1 : tpNode.Childf("%03d index: offset=%d, length=%d", offset, indexHandle, indexLen)
156 1 : trailer, offset = trailer[n+m:], offset+n+m
157 1 :
158 1 : trailing := 12
159 1 : if b.Name == "leveldb-footer" {
160 0 : trailing = 8
161 0 : }
162 :
163 1 : offset += len(trailer) - trailing
164 1 : trailer = trailer[len(trailer)-trailing:]
165 1 :
166 1 : if b.Name == "footer" {
167 1 : version := trailer[:4]
168 1 : tpNode.Childf("%03d version: %d", offset, binary.LittleEndian.Uint32(version))
169 1 : trailer, offset = trailer[4:], offset+4
170 1 : }
171 :
172 1 : magicNumber := trailer
173 1 : tpNode.Childf("%03d magic number: 0x%x", offset, magicNumber)
174 1 :
175 1 : continue
176 : }
177 :
178 : // Read the block and format it. Returns an error if we couldn't read the
179 : // block.
180 1 : err := func() error {
181 1 : var err error
182 1 : var h block.BufferHandle
183 1 : // Defer release of any block handle that will have been read.
184 1 : defer func() { h.Release() }()
185 :
186 1 : switch b.Name {
187 1 : case "data":
188 1 : h, err = r.readDataBlock(ctx, noEnv, noReadHandle, b.Handle)
189 1 : if err != nil {
190 0 : return err
191 0 : }
192 1 : if fmtKV == nil {
193 1 : formatting.formatDataBlock(tpNode, r, *b, h.BlockData(), nil)
194 1 : } else {
195 1 : var lastKey InternalKey
196 1 : formatting.formatDataBlock(tpNode, r, *b, h.BlockData(), func(key *base.InternalKey, value []byte) string {
197 1 : v := fmtKV(key, value)
198 1 : if base.InternalCompare(r.Compare, lastKey, *key) >= 0 {
199 1 : v += " WARNING: OUT OF ORDER KEYS!"
200 1 : }
201 1 : lastKey.Trailer = key.Trailer
202 1 : lastKey.UserKey = append(lastKey.UserKey[:0], key.UserKey...)
203 1 : return v
204 : })
205 : }
206 :
207 1 : case "range-del":
208 1 : h, err = r.readRangeDelBlock(ctx, noEnv, noReadHandle, b.Handle)
209 1 : if err != nil {
210 0 : return err
211 0 : }
212 : // TODO(jackson): colblk ignores fmtKV, because it doesn't
213 : // make sense in the context.
214 1 : formatting.formatKeyspanBlock(tpNode, r, *b, h.BlockData(), fmtKV)
215 :
216 1 : case "range-key":
217 1 : h, err = r.readRangeKeyBlock(ctx, noEnv, noReadHandle, b.Handle)
218 1 : if err != nil {
219 0 : return err
220 0 : }
221 : // TODO(jackson): colblk ignores fmtKV, because it doesn't
222 : // make sense in the context.
223 1 : formatting.formatKeyspanBlock(tpNode, r, *b, h.BlockData(), fmtKV)
224 :
225 1 : case "index", "top-index":
226 1 : h, err = r.readIndexBlock(ctx, noEnv, noReadHandle, b.Handle)
227 1 : if err != nil {
228 0 : return err
229 0 : }
230 1 : formatting.formatIndexBlock(tpNode, r, *b, h.BlockData())
231 :
232 1 : case "properties":
233 1 : h, err = r.readBlockInternal(ctx, noEnv, noReadHandle, b.Handle, noInitBlockMetadataFn)
234 1 : if err != nil {
235 0 : return err
236 0 : }
237 1 : iter, _ := rowblk.NewRawIter(r.Compare, h.BlockData())
238 1 : iter.Describe(tpNode, func(w io.Writer, key *base.InternalKey, value []byte, enc rowblk.KVEncoding) {
239 1 : fmt.Fprintf(w, "%05d %s (%d)", enc.Offset, key.UserKey, enc.Length)
240 1 : })
241 :
242 1 : case "meta-index":
243 1 : if b.Handle != r.metaindexBH {
244 0 : return base.AssertionFailedf("range-del block handle does not match rangeDelBH")
245 0 : }
246 1 : h, err = r.readMetaindexBlock(ctx, noEnv, noReadHandle)
247 1 : if err != nil {
248 0 : return err
249 0 : }
250 1 : iter, _ := rowblk.NewRawIter(r.Compare, h.BlockData())
251 1 : iter.Describe(tpNode, func(w io.Writer, key *base.InternalKey, value []byte, enc rowblk.KVEncoding) {
252 1 : var bh block.Handle
253 1 : var n int
254 1 : var vbih valueBlocksIndexHandle
255 1 : isValueBlocksIndexHandle := false
256 1 : if bytes.Equal(iter.Key().UserKey, []byte(metaValueIndexName)) {
257 1 : vbih, n, err = decodeValueBlocksIndexHandle(value)
258 1 : bh = vbih.h
259 1 : isValueBlocksIndexHandle = true
260 1 : } else {
261 1 : bh, n = block.DecodeHandle(value)
262 1 : }
263 1 : if n == 0 || n != len(value) {
264 0 : fmt.Fprintf(w, "%04d [err: %s]\n", enc.Offset, err)
265 0 : return
266 0 : }
267 1 : var vbihStr string
268 1 : if isValueBlocksIndexHandle {
269 1 : vbihStr = fmt.Sprintf(" value-blocks-index-lengths: %d(num), %d(offset), %d(length)",
270 1 : vbih.blockNumByteLength, vbih.blockOffsetByteLength, vbih.blockLengthByteLength)
271 1 : }
272 1 : fmt.Fprintf(w, "%04d %s block:%d/%d%s",
273 1 : uint64(enc.Offset), iter.Key().UserKey, bh.Offset, bh.Length, vbihStr)
274 : })
275 :
276 1 : case "value-block":
277 : // We don't peer into the value-block since it can't be interpreted
278 : // without the valueHandles.
279 1 : case "value-index":
280 : // We have already read the value-index to construct the list of
281 : // value-blocks, so no need to do it again.
282 : }
283 :
284 : // Format the trailer.
285 1 : trailer := make([]byte, block.TrailerLen)
286 1 : _ = r.readable.ReadAt(ctx, trailer, int64(b.Offset+b.Length))
287 1 : algo := block.CompressionIndicator(trailer[0])
288 1 : checksum := binary.LittleEndian.Uint32(trailer[1:])
289 1 : tpNode.Childf("trailer [compression=%s checksum=0x%04x]", algo, checksum)
290 1 : return nil
291 : }()
292 1 : if err != nil {
293 0 : tpNode.Childf("error reading block: %v", err)
294 0 : }
295 : }
296 1 : return tp.String()
297 : }
298 :
299 : type blockFormatting struct {
300 : formatIndexBlock formatBlockFunc
301 : formatDataBlock formatBlockFuncKV
302 : formatKeyspanBlock formatBlockFuncKV
303 : }
304 :
305 : type (
306 : formatBlockFunc func(treeprinter.Node, *Reader, NamedBlockHandle, []byte) error
307 : formatBlockFuncKV func(treeprinter.Node, *Reader, NamedBlockHandle, []byte, func(*base.InternalKey, []byte) string) error
308 : )
309 :
310 : var (
311 : rowblkFormatting = blockFormatting{
312 : formatIndexBlock: formatRowblkIndexBlock,
313 : formatDataBlock: formatRowblkDataBlock,
314 : formatKeyspanBlock: formatRowblkDataBlock,
315 : }
316 : colblkFormatting = blockFormatting{
317 : formatIndexBlock: formatColblkIndexBlock,
318 : formatDataBlock: formatColblkDataBlock,
319 : formatKeyspanBlock: formatColblkKeyspanBlock,
320 : }
321 : )
322 :
323 1 : func formatColblkIndexBlock(tp treeprinter.Node, r *Reader, b NamedBlockHandle, data []byte) error {
324 1 : var iter colblk.IndexIter
325 1 : if err := iter.Init(r.Compare, r.Split, data, NoTransforms); err != nil {
326 0 : return err
327 0 : }
328 1 : defer iter.Close()
329 1 : i := 0
330 1 : for v := iter.First(); v; v = iter.Next() {
331 1 : bh, err := iter.BlockHandleWithProperties()
332 1 : if err != nil {
333 0 : return err
334 0 : }
335 1 : tp.Childf("%05d block:%d/%d\n", i, bh.Offset, bh.Length)
336 1 : i++
337 : }
338 1 : return nil
339 : }
340 :
341 : func formatColblkDataBlock(
342 : tp treeprinter.Node,
343 : r *Reader,
344 : b NamedBlockHandle,
345 : data []byte,
346 : fmtKV func(key *base.InternalKey, value []byte) string,
347 1 : ) error {
348 1 : var decoder colblk.DataBlockDecoder
349 1 : decoder.Init(r.keySchema, data)
350 1 : f := binfmt.New(data)
351 1 : decoder.Describe(f, tp)
352 1 :
353 1 : if fmtKV != nil {
354 1 : var iter colblk.DataBlockIter
355 1 : iter.InitOnce(r.keySchema, r.Compare, r.Split, describingLazyValueHandler{})
356 1 : if err := iter.Init(&decoder, block.IterTransforms{}); err != nil {
357 0 : return err
358 0 : }
359 1 : defer iter.Close()
360 1 : for kv := iter.First(); kv != nil; kv = iter.Next() {
361 1 : tp.Child(fmtKV(&kv.K, kv.V.ValueOrHandle))
362 1 : }
363 : }
364 1 : return nil
365 : }
366 :
367 : // describingLazyValueHandler is a block.GetLazyValueForPrefixAndValueHandler
368 : // that replaces a value handle with an in-place value describing the handle.
369 : type describingLazyValueHandler struct{}
370 :
371 : // Assert that debugLazyValueHandler implements the
372 : // block.GetLazyValueForPrefixAndValueHandler interface.
373 : var _ block.GetLazyValueForPrefixAndValueHandler = describingLazyValueHandler{}
374 :
375 : func (describingLazyValueHandler) GetLazyValueForPrefixAndValueHandle(
376 : handle []byte,
377 1 : ) base.LazyValue {
378 1 : vh := decodeValueHandle(handle[1:])
379 1 : return base.LazyValue{ValueOrHandle: []byte(fmt.Sprintf("value handle %+v", vh))}
380 1 : }
381 :
382 : func formatColblkKeyspanBlock(
383 : tp treeprinter.Node,
384 : r *Reader,
385 : b NamedBlockHandle,
386 : data []byte,
387 : _ func(*base.InternalKey, []byte) string,
388 1 : ) error {
389 1 : var decoder colblk.KeyspanDecoder
390 1 : decoder.Init(data)
391 1 : f := binfmt.New(data)
392 1 : decoder.Describe(f, tp)
393 1 : return nil
394 1 : }
395 :
396 1 : func formatRowblkIndexBlock(tp treeprinter.Node, r *Reader, b NamedBlockHandle, data []byte) error {
397 1 : iter, err := rowblk.NewIter(r.Compare, r.Split, data, NoTransforms)
398 1 : if err != nil {
399 0 : return err
400 0 : }
401 1 : iter.Describe(tp, func(w io.Writer, key *base.InternalKey, value []byte, enc rowblk.KVEncoding) {
402 1 : bh, err := block.DecodeHandleWithProperties(value)
403 1 : if err != nil {
404 0 : fmt.Fprintf(w, "%05d [err: %s]\n", enc.Offset, err)
405 0 : return
406 0 : }
407 1 : fmt.Fprintf(w, "%05d block:%d/%d", enc.Offset, bh.Offset, bh.Length)
408 1 : if enc.IsRestart {
409 1 : fmt.Fprintf(w, " [restart]")
410 1 : }
411 : })
412 1 : return nil
413 : }
414 :
415 : func formatRowblkDataBlock(
416 : tp treeprinter.Node,
417 : r *Reader,
418 : b NamedBlockHandle,
419 : data []byte,
420 : fmtRecord func(key *base.InternalKey, value []byte) string,
421 1 : ) error {
422 1 : iter, err := rowblk.NewIter(r.Compare, r.Split, data, NoTransforms)
423 1 : if err != nil {
424 0 : return err
425 0 : }
426 1 : iter.Describe(tp, func(w io.Writer, key *base.InternalKey, value []byte, enc rowblk.KVEncoding) {
427 1 : // The format of the numbers in the record line is:
428 1 : //
429 1 : // (<total> = <length> [<shared>] + <unshared> + <value>)
430 1 : //
431 1 : // <total> is the total number of bytes for the record.
432 1 : // <length> is the size of the 3 varint encoded integers for <shared>,
433 1 : // <unshared>, and <value>.
434 1 : // <shared> is the number of key bytes shared with the previous key.
435 1 : // <unshared> is the number of unshared key bytes.
436 1 : // <value> is the number of value bytes.
437 1 : fmt.Fprintf(w, "%05d record (%d = %d [%d] + %d + %d)",
438 1 : uint64(enc.Offset), enc.Length,
439 1 : enc.Length-int32(enc.KeyUnshared+enc.ValueLen), enc.KeyShared, enc.KeyUnshared, enc.ValueLen)
440 1 : if enc.IsRestart {
441 1 : fmt.Fprint(w, " [restart]")
442 1 : }
443 1 : if fmtRecord != nil {
444 1 : if r.tableFormat < TableFormatPebblev3 || key.Kind() != InternalKeyKindSet {
445 1 : fmt.Fprintf(w, "\n %s", fmtRecord(key, value))
446 1 : } else if !block.ValuePrefix(value[0]).IsValueHandle() {
447 1 : fmt.Fprintf(w, "\n %s", fmtRecord(key, value[1:]))
448 1 : } else {
449 1 : vh := decodeValueHandle(value[1:])
450 1 : fmt.Fprintf(w, "\n %s", fmtRecord(key, []byte(fmt.Sprintf("value handle %+v", vh))))
451 1 : }
452 : }
453 : })
454 1 : return nil
455 : }
456 :
457 1 : func decodeLayout(comparer *base.Comparer, data []byte) (Layout, error) {
458 1 : foot, err := parseFooter(data, 0, int64(len(data)))
459 1 : if err != nil {
460 0 : return Layout{}, err
461 0 : }
462 1 : decompressedMeta, err := decompressInMemory(data, foot.metaindexBH)
463 1 : if err != nil {
464 0 : return Layout{}, errors.Wrap(err, "decompressing metaindex")
465 0 : }
466 1 : meta, vbih, err := decodeMetaindex(decompressedMeta)
467 1 : if err != nil {
468 0 : return Layout{}, err
469 0 : }
470 1 : layout := Layout{
471 1 : MetaIndex: foot.metaindexBH,
472 1 : Properties: meta[metaPropertiesName],
473 1 : RangeDel: meta[metaRangeDelV2Name],
474 1 : RangeKey: meta[metaRangeKeyName],
475 1 : ValueIndex: vbih.h,
476 1 : Footer: foot.footerBH,
477 1 : Format: foot.format,
478 1 : }
479 1 : var props Properties
480 1 : decompressedProps, err := decompressInMemory(data, layout.Properties)
481 1 : if err != nil {
482 0 : return Layout{}, errors.Wrap(err, "decompressing properties")
483 0 : }
484 1 : if err := props.load(decompressedProps, map[string]struct{}{}); err != nil {
485 0 : return Layout{}, err
486 0 : }
487 :
488 1 : if props.IndexType == twoLevelIndex {
489 1 : decompressed, err := decompressInMemory(data, foot.indexBH)
490 1 : if err != nil {
491 0 : return Layout{}, errors.Wrap(err, "decompressing two-level index")
492 0 : }
493 1 : layout.TopIndex = foot.indexBH
494 1 : topLevelIter, err := newIndexIter(foot.format, comparer, decompressed)
495 1 : if err != nil {
496 0 : return Layout{}, err
497 0 : }
498 1 : err = forEachIndexEntry(topLevelIter, func(bhp block.HandleWithProperties) {
499 1 : layout.Index = append(layout.Index, bhp.Handle)
500 1 : })
501 1 : if err != nil {
502 0 : return Layout{}, err
503 0 : }
504 0 : } else {
505 0 : layout.Index = append(layout.Index, foot.indexBH)
506 0 : }
507 1 : for _, indexBH := range layout.Index {
508 1 : decompressed, err := decompressInMemory(data, indexBH)
509 1 : if err != nil {
510 0 : return Layout{}, errors.Wrap(err, "decompressing index block")
511 0 : }
512 1 : indexIter, err := newIndexIter(foot.format, comparer, decompressed)
513 1 : if err != nil {
514 0 : return Layout{}, err
515 0 : }
516 1 : err = forEachIndexEntry(indexIter, func(bhp block.HandleWithProperties) {
517 1 : layout.Data = append(layout.Data, bhp)
518 1 : })
519 1 : if err != nil {
520 0 : return Layout{}, err
521 0 : }
522 : }
523 :
524 1 : if layout.ValueIndex.Length > 0 {
525 0 : vbiBlock, err := decompressInMemory(data, layout.ValueIndex)
526 0 : if err != nil {
527 0 : return Layout{}, errors.Wrap(err, "decompressing value index")
528 0 : }
529 0 : layout.ValueBlock, err = decodeValueBlockIndex(vbiBlock, vbih)
530 0 : if err != nil {
531 0 : return Layout{}, err
532 0 : }
533 : }
534 :
535 1 : return layout, nil
536 : }
537 :
538 1 : func decompressInMemory(data []byte, bh block.Handle) ([]byte, error) {
539 1 : typ := block.CompressionIndicator(data[bh.Offset+bh.Length])
540 1 : var decompressed []byte
541 1 : if typ == block.NoCompressionIndicator {
542 1 : return data[bh.Offset : bh.Offset+bh.Length], nil
543 1 : }
544 : // Decode the length of the decompressed value.
545 1 : decodedLen, prefixLen, err := block.DecompressedLen(typ, data[bh.Offset:bh.Offset+bh.Length])
546 1 : if err != nil {
547 0 : return nil, err
548 0 : }
549 1 : decompressed = make([]byte, decodedLen)
550 1 : if err := block.DecompressInto(typ, data[int(bh.Offset)+prefixLen:bh.Offset+bh.Length], decompressed); err != nil {
551 0 : return nil, err
552 0 : }
553 1 : return decompressed, nil
554 : }
555 :
556 : func newIndexIter(
557 : tableFormat TableFormat, comparer *base.Comparer, data []byte,
558 1 : ) (block.IndexBlockIterator, error) {
559 1 : var iter block.IndexBlockIterator
560 1 : var err error
561 1 : if tableFormat <= TableFormatPebblev4 {
562 1 : iter = new(rowblk.IndexIter)
563 1 : err = iter.Init(comparer.Compare, comparer.Split, data, block.NoTransforms)
564 1 : } else {
565 1 : iter = new(colblk.IndexIter)
566 1 : err = iter.Init(comparer.Compare, comparer.Split, data, block.NoTransforms)
567 1 : }
568 1 : if err != nil {
569 0 : return nil, err
570 0 : }
571 1 : return iter, nil
572 : }
573 :
574 : func forEachIndexEntry(
575 : indexIter block.IndexBlockIterator, fn func(block.HandleWithProperties),
576 1 : ) error {
577 1 : for v := indexIter.First(); v; v = indexIter.Next() {
578 1 : bhp, err := indexIter.BlockHandleWithProperties()
579 1 : if err != nil {
580 0 : return err
581 0 : }
582 1 : fn(bhp)
583 : }
584 1 : return indexIter.Close()
585 : }
586 :
587 : func decodeMetaindex(
588 : data []byte,
589 2 : ) (meta map[string]block.Handle, vbih valueBlocksIndexHandle, err error) {
590 2 : i, err := rowblk.NewRawIter(bytes.Compare, data)
591 2 : if err != nil {
592 0 : return nil, valueBlocksIndexHandle{}, err
593 0 : }
594 2 : defer func() { err = firstError(err, i.Close()) }()
595 :
596 2 : meta = map[string]block.Handle{}
597 2 : for valid := i.First(); valid; valid = i.Next() {
598 2 : value := i.Value()
599 2 : if bytes.Equal(i.Key().UserKey, []byte(metaValueIndexName)) {
600 2 : var n int
601 2 : vbih, n, err = decodeValueBlocksIndexHandle(i.Value())
602 2 : if err != nil {
603 0 : return nil, vbih, err
604 0 : }
605 2 : if n == 0 || n != len(value) {
606 0 : return nil, vbih, base.CorruptionErrorf("pebble/table: invalid table (bad value blocks index handle)")
607 0 : }
608 2 : } else {
609 2 : bh, n := block.DecodeHandle(value)
610 2 : if n == 0 || n != len(value) {
611 0 : return nil, vbih, base.CorruptionErrorf("pebble/table: invalid table (bad block handle)")
612 0 : }
613 2 : meta[string(i.Key().UserKey)] = bh
614 : }
615 : }
616 2 : return meta, vbih, nil
617 : }
618 :
619 2 : func decodeValueBlockIndex(data []byte, vbih valueBlocksIndexHandle) ([]block.Handle, error) {
620 2 : var valueBlocks []block.Handle
621 2 : indexEntryLen := int(vbih.blockNumByteLength + vbih.blockOffsetByteLength +
622 2 : vbih.blockLengthByteLength)
623 2 : i := 0
624 2 : for len(data) != 0 {
625 2 : if len(data) < indexEntryLen {
626 0 : return nil, errors.Errorf(
627 0 : "remaining value index block %d does not contain a full entry of length %d",
628 0 : len(data), indexEntryLen)
629 0 : }
630 2 : n := int(vbih.blockNumByteLength)
631 2 : bn := int(littleEndianGet(data, n))
632 2 : if bn != i {
633 0 : return nil, errors.Errorf("unexpected block num %d, expected %d",
634 0 : bn, i)
635 0 : }
636 2 : i++
637 2 : data = data[n:]
638 2 : n = int(vbih.blockOffsetByteLength)
639 2 : blockOffset := littleEndianGet(data, n)
640 2 : data = data[n:]
641 2 : n = int(vbih.blockLengthByteLength)
642 2 : blockLen := littleEndianGet(data, n)
643 2 : data = data[n:]
644 2 : valueBlocks = append(valueBlocks, block.Handle{Offset: blockOffset, Length: blockLen})
645 : }
646 2 : return valueBlocks, nil
647 : }
648 :
649 : // layoutWriter writes the structure of an sstable to durable storage. It
650 : // accepts serialized blocks, writes them to storage and returns a block handle
651 : // describing the offset and length of the block.
652 : type layoutWriter struct {
653 : writable objstorage.Writable
654 :
655 : // cacheOpts are used to remove blocks written to the sstable from the cache,
656 : // providing a defense in depth against bugs which cause cache collisions.
657 : cacheOpts sstableinternal.CacheOptions
658 :
659 : // options copied from WriterOptions
660 : tableFormat TableFormat
661 : compression block.Compression
662 : checksumType block.ChecksumType
663 :
664 : // offset tracks the current write offset within the writable.
665 : offset uint64
666 : // lastIndexBlockHandle holds the handle to the most recently-written index
667 : // block. It's updated by writeIndexBlock. When writing sstables with a
668 : // single-level index, this field will be updated once. When writing
669 : // sstables with a two-level index, the last update will set the two-level
670 : // index.
671 : lastIndexBlockHandle block.Handle
672 : handles []metaIndexHandle
673 : handlesBuf bytealloc.A
674 : tmp [blockHandleLikelyMaxLen]byte
675 : buf blockBuf
676 : }
677 :
678 2 : func makeLayoutWriter(w objstorage.Writable, opts WriterOptions) layoutWriter {
679 2 : return layoutWriter{
680 2 : writable: w,
681 2 : cacheOpts: opts.internal.CacheOpts,
682 2 : tableFormat: opts.TableFormat,
683 2 : compression: opts.Compression,
684 2 : checksumType: opts.Checksum,
685 2 : buf: blockBuf{
686 2 : checksummer: block.Checksummer{Type: opts.Checksum},
687 2 : },
688 2 : }
689 2 : }
690 :
691 : type metaIndexHandle struct {
692 : key string
693 : encodedBlockHandle []byte
694 : }
695 :
696 : // Abort aborts writing the table, aborting the underlying writable too. Abort
697 : // is idempotent.
698 2 : func (w *layoutWriter) Abort() {
699 2 : if w.writable != nil {
700 1 : w.writable.Abort()
701 1 : w.writable = nil
702 1 : }
703 : }
704 :
705 : // WriteDataBlock constructs a trailer for the provided data block and writes
706 : // the block and trailer to the writer. It returns the block's handle.
707 2 : func (w *layoutWriter) WriteDataBlock(b []byte, buf *blockBuf) (block.Handle, error) {
708 2 : return w.writeBlock(b, w.compression, buf)
709 2 : }
710 :
711 : // WritePrecompressedDataBlock writes a pre-compressed data block and its
712 : // pre-computed trailer to the writer, returning it's block handle.
713 2 : func (w *layoutWriter) WritePrecompressedDataBlock(blk block.PhysicalBlock) (block.Handle, error) {
714 2 : return w.writePrecompressedBlock(blk)
715 2 : }
716 :
717 : // WriteIndexBlock constructs a trailer for the provided index (first or
718 : // second-level) and writes the block and trailer to the writer. It remembers
719 : // the last-written index block's handle and adds it to the file's meta index
720 : // when the writer is finished.
721 2 : func (w *layoutWriter) WriteIndexBlock(b []byte) (block.Handle, error) {
722 2 : h, err := w.writeBlock(b, w.compression, &w.buf)
723 2 : if err == nil {
724 2 : w.lastIndexBlockHandle = h
725 2 : }
726 2 : return h, err
727 : }
728 :
729 : // WriteFilterBlock finishes the provided filter, constructs a trailer and
730 : // writes the block and trailer to the writer. It automatically adds the filter
731 : // block to the file's meta index when the writer is finished.
732 2 : func (w *layoutWriter) WriteFilterBlock(f filterWriter) (bh block.Handle, err error) {
733 2 : b, err := f.finish()
734 2 : if err != nil {
735 0 : return block.Handle{}, err
736 0 : }
737 2 : return w.writeNamedBlock(b, f.metaName())
738 : }
739 :
740 : // WritePropertiesBlock constructs a trailer for the provided properties block
741 : // and writes the block and trailer to the writer. It automatically adds the
742 : // properties block to the file's meta index when the writer is finished.
743 2 : func (w *layoutWriter) WritePropertiesBlock(b []byte) (block.Handle, error) {
744 2 : return w.writeNamedBlock(b, metaPropertiesName)
745 2 : }
746 :
747 : // WriteRangeKeyBlock constructs a trailer for the provided range key block and
748 : // writes the block and trailer to the writer. It automatically adds the range
749 : // key block to the file's meta index when the writer is finished.
750 2 : func (w *layoutWriter) WriteRangeKeyBlock(b []byte) (block.Handle, error) {
751 2 : return w.writeNamedBlock(b, metaRangeKeyName)
752 2 : }
753 :
754 : // WriteRangeDeletionBlock constructs a trailer for the provided range deletion
755 : // block and writes the block and trailer to the writer. It automatically adds
756 : // the range deletion block to the file's meta index when the writer is
757 : // finished.
758 2 : func (w *layoutWriter) WriteRangeDeletionBlock(b []byte) (block.Handle, error) {
759 2 : return w.writeNamedBlock(b, metaRangeDelV2Name)
760 2 : }
761 :
762 2 : func (w *layoutWriter) writeNamedBlock(b []byte, name string) (bh block.Handle, err error) {
763 2 : bh, err = w.writeBlock(b, block.NoCompression, &w.buf)
764 2 : if err == nil {
765 2 : w.recordToMetaindex(name, bh)
766 2 : }
767 2 : return bh, err
768 : }
769 :
770 : // WriteValueBlock writes a pre-finished value block (with the trailer) to the
771 : // writer.
772 2 : func (w *layoutWriter) WriteValueBlock(blk block.PhysicalBlock) (block.Handle, error) {
773 2 : return w.writePrecompressedBlock(blk)
774 2 : }
775 :
776 : func (w *layoutWriter) WriteValueIndexBlock(
777 : blk []byte, vbih valueBlocksIndexHandle,
778 2 : ) (block.Handle, error) {
779 2 : // NB: value index blocks are already finished and contain the block
780 2 : // trailer.
781 2 : // TODO(jackson): can this be refactored to make value blocks less
782 2 : // of a snowflake?
783 2 : off := w.offset
784 2 : w.clearFromCache(off)
785 2 : // Write the bytes to the file.
786 2 : if err := w.writable.Write(blk); err != nil {
787 0 : return block.Handle{}, err
788 0 : }
789 2 : l := uint64(len(blk))
790 2 : w.offset += l
791 2 :
792 2 : n := encodeValueBlocksIndexHandle(w.tmp[:], vbih)
793 2 : w.recordToMetaindexRaw(metaValueIndexName, w.tmp[:n])
794 2 :
795 2 : return block.Handle{Offset: off, Length: l}, nil
796 : }
797 :
798 : func (w *layoutWriter) writeBlock(
799 : b []byte, compression block.Compression, buf *blockBuf,
800 2 : ) (block.Handle, error) {
801 2 : return w.writePrecompressedBlock(block.CompressAndChecksum(
802 2 : &buf.compressedBuf, b, compression, &buf.checksummer))
803 2 : }
804 :
805 : // writePrecompressedBlock writes a pre-compressed block and its
806 : // pre-computed trailer to the writer, returning it's block handle.
807 2 : func (w *layoutWriter) writePrecompressedBlock(blk block.PhysicalBlock) (block.Handle, error) {
808 2 : w.clearFromCache(w.offset)
809 2 : // Write the bytes to the file.
810 2 : n, err := blk.WriteTo(w.writable)
811 2 : if err != nil {
812 0 : return block.Handle{}, err
813 0 : }
814 2 : bh := block.Handle{Offset: w.offset, Length: uint64(blk.LengthWithoutTrailer())}
815 2 : w.offset += uint64(n)
816 2 : return bh, nil
817 : }
818 :
819 : // Write implements io.Writer. This is analogous to writePrecompressedBlock for
820 : // blocks that already incorporate the trailer, and don't need the callee to
821 : // return a BlockHandle.
822 0 : func (w *layoutWriter) Write(blockWithTrailer []byte) (n int, err error) {
823 0 : offset := w.offset
824 0 : w.clearFromCache(offset)
825 0 : w.offset += uint64(len(blockWithTrailer))
826 0 : if err := w.writable.Write(blockWithTrailer); err != nil {
827 0 : return 0, err
828 0 : }
829 0 : return len(blockWithTrailer), nil
830 : }
831 :
832 : // clearFromCache removes the block at the provided offset from the cache. This provides defense in
833 : // depth against bugs which cause cache collisions.
834 2 : func (w *layoutWriter) clearFromCache(offset uint64) {
835 2 : if w.cacheOpts.Cache != nil {
836 2 : // TODO(peter): Alternatively, we could add the uncompressed value to the
837 2 : // cache.
838 2 : w.cacheOpts.Cache.Delete(w.cacheOpts.CacheID, w.cacheOpts.FileNum, offset)
839 2 : }
840 : }
841 :
842 2 : func (w *layoutWriter) recordToMetaindex(key string, h block.Handle) {
843 2 : n := h.EncodeVarints(w.tmp[:])
844 2 : w.recordToMetaindexRaw(key, w.tmp[:n])
845 2 : }
846 :
847 2 : func (w *layoutWriter) recordToMetaindexRaw(key string, h []byte) {
848 2 : var encodedHandle []byte
849 2 : w.handlesBuf, encodedHandle = w.handlesBuf.Alloc(len(h))
850 2 : copy(encodedHandle, h)
851 2 : w.handles = append(w.handles, metaIndexHandle{key: key, encodedBlockHandle: encodedHandle})
852 2 : }
853 :
854 2 : func (w *layoutWriter) IsFinished() bool { return w.writable == nil }
855 :
856 : // Finish serializes the sstable, writing out the meta index block and sstable
857 : // footer and closing the file. It returns the total size of the resulting
858 : // ssatable.
859 2 : func (w *layoutWriter) Finish() (size uint64, err error) {
860 2 : // Sort the meta index handles by key and write the meta index block.
861 2 : slices.SortFunc(w.handles, func(a, b metaIndexHandle) int {
862 2 : return cmp.Compare(a.key, b.key)
863 2 : })
864 2 : bw := rowblk.Writer{RestartInterval: 1}
865 2 : for _, h := range w.handles {
866 2 : bw.AddRaw(unsafe.Slice(unsafe.StringData(h.key), len(h.key)), h.encodedBlockHandle)
867 2 : }
868 2 : metaIndexHandle, err := w.writeBlock(bw.Finish(), block.NoCompression, &w.buf)
869 2 : if err != nil {
870 0 : return 0, err
871 0 : }
872 :
873 : // Write the table footer.
874 2 : footer := footer{
875 2 : format: w.tableFormat,
876 2 : checksum: w.checksumType,
877 2 : metaindexBH: metaIndexHandle,
878 2 : indexBH: w.lastIndexBlockHandle,
879 2 : }
880 2 : encodedFooter := footer.encode(w.tmp[:])
881 2 : if err := w.writable.Write(encodedFooter); err != nil {
882 0 : return 0, err
883 0 : }
884 2 : w.offset += uint64(len(encodedFooter))
885 2 :
886 2 : err = w.writable.Finish()
887 2 : w.writable = nil
888 2 : return w.offset, err
889 : }
|