Line data Source code
1 : // Copyright 2011 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package sstable
6 :
7 : import (
8 : "bytes"
9 : "cmp"
10 : "context"
11 : "encoding/binary"
12 : "fmt"
13 : "io"
14 : "slices"
15 : "unsafe"
16 :
17 : "github.com/cockroachdb/errors"
18 : "github.com/cockroachdb/pebble/internal/base"
19 : "github.com/cockroachdb/pebble/internal/binfmt"
20 : "github.com/cockroachdb/pebble/internal/bytealloc"
21 : "github.com/cockroachdb/pebble/internal/sstableinternal"
22 : "github.com/cockroachdb/pebble/internal/treeprinter"
23 : "github.com/cockroachdb/pebble/objstorage"
24 : "github.com/cockroachdb/pebble/sstable/block"
25 : "github.com/cockroachdb/pebble/sstable/colblk"
26 : "github.com/cockroachdb/pebble/sstable/rowblk"
27 : "github.com/cockroachdb/pebble/sstable/valblk"
28 : )
29 :
30 : // Layout describes the block organization of an sstable.
31 : type Layout struct {
32 : // NOTE: changes to fields in this struct should also be reflected in
33 : // ValidateBlockChecksums, which validates a static list of BlockHandles
34 : // referenced in this struct.
35 :
36 : Data []block.HandleWithProperties
37 : Index []block.Handle
38 : TopIndex block.Handle
39 : Filter []NamedBlockHandle
40 : RangeDel block.Handle
41 : RangeKey block.Handle
42 : ValueBlock []block.Handle
43 : ValueIndex block.Handle
44 : Properties block.Handle
45 : MetaIndex block.Handle
46 : Footer block.Handle
47 : Format TableFormat
48 : }
49 :
50 : // NamedBlockHandle holds a block.Handle and corresponding name.
51 : type NamedBlockHandle struct {
52 : block.Handle
53 : Name string
54 : }
55 :
56 : // FilterByName retrieves the block handle of the named filter, if it exists.
57 : // The provided the name should be the name as it appears in the metaindex
58 : // block.
59 0 : func (l *Layout) FilterByName(name string) (block.Handle, bool) {
60 0 : for i := range l.Filter {
61 0 : if l.Filter[i].Name == name {
62 0 : return l.Filter[i].Handle, true
63 0 : }
64 : }
65 0 : return block.Handle{}, false
66 : }
67 :
68 0 : func (l *Layout) orderedBlocks() []NamedBlockHandle {
69 0 : var blocks []NamedBlockHandle
70 0 : for i := range l.Data {
71 0 : blocks = append(blocks, NamedBlockHandle{l.Data[i].Handle, "data"})
72 0 : }
73 0 : for i := range l.Index {
74 0 : blocks = append(blocks, NamedBlockHandle{l.Index[i], "index"})
75 0 : }
76 0 : if l.TopIndex.Length != 0 {
77 0 : blocks = append(blocks, NamedBlockHandle{l.TopIndex, "top-index"})
78 0 : }
79 0 : blocks = append(blocks, l.Filter...)
80 0 : if l.RangeDel.Length != 0 {
81 0 : blocks = append(blocks, NamedBlockHandle{l.RangeDel, "range-del"})
82 0 : }
83 0 : if l.RangeKey.Length != 0 {
84 0 : blocks = append(blocks, NamedBlockHandle{l.RangeKey, "range-key"})
85 0 : }
86 0 : for i := range l.ValueBlock {
87 0 : blocks = append(blocks, NamedBlockHandle{l.ValueBlock[i], "value-block"})
88 0 : }
89 0 : if l.ValueIndex.Length != 0 {
90 0 : blocks = append(blocks, NamedBlockHandle{l.ValueIndex, "value-index"})
91 0 : }
92 0 : if l.Properties.Length != 0 {
93 0 : blocks = append(blocks, NamedBlockHandle{l.Properties, "properties"})
94 0 : }
95 0 : if l.MetaIndex.Length != 0 {
96 0 : blocks = append(blocks, NamedBlockHandle{l.MetaIndex, "meta-index"})
97 0 : }
98 0 : if l.Footer.Length != 0 {
99 0 : if l.Footer.Length == levelDBFooterLen {
100 0 : blocks = append(blocks, NamedBlockHandle{l.Footer, "leveldb-footer"})
101 0 : } else {
102 0 : blocks = append(blocks, NamedBlockHandle{l.Footer, "footer"})
103 0 : }
104 : }
105 0 : slices.SortFunc(blocks, func(a, b NamedBlockHandle) int {
106 0 : return cmp.Compare(a.Offset, b.Offset)
107 0 : })
108 0 : return blocks
109 : }
110 :
111 : // Describe returns a description of the layout. If the verbose parameter is
112 : // true, details of the structure of each block are returned as well.
113 : // If verbose is true and fmtKV is non-nil, the output includes the KVs (as formatted by this function).
114 : func (l *Layout) Describe(
115 : verbose bool, r *Reader, fmtKV func(key *base.InternalKey, value []byte) string,
116 0 : ) string {
117 0 : ctx := context.TODO()
118 0 :
119 0 : blocks := l.orderedBlocks()
120 0 : formatting := rowblkFormatting
121 0 : if l.Format.BlockColumnar() {
122 0 : formatting = colblkFormatting
123 0 : }
124 :
125 0 : tp := treeprinter.New()
126 0 : root := tp.Child("sstable")
127 0 :
128 0 : for i := range blocks {
129 0 : b := &blocks[i]
130 0 : tpNode := root.Childf("%s offset: %d length: %d", b.Name, b.Offset, b.Length)
131 0 :
132 0 : if !verbose {
133 0 : continue
134 : }
135 0 : if b.Name == "filter" {
136 0 : continue
137 : }
138 :
139 0 : if b.Name == "footer" || b.Name == "leveldb-footer" {
140 0 : trailer, offset := make([]byte, b.Length), 0
141 0 : _ = r.readable.ReadAt(ctx, trailer, int64(b.Offset))
142 0 :
143 0 : if b.Name == "footer" {
144 0 : checksumType := block.ChecksumType(trailer[0])
145 0 : tpNode.Childf("%03d checksum type: %s", offset, checksumType)
146 0 : trailer, offset = trailer[1:], offset+1
147 0 : }
148 :
149 0 : metaHandle, n := binary.Uvarint(trailer)
150 0 : metaLen, m := binary.Uvarint(trailer[n:])
151 0 : tpNode.Childf("%03d meta: offset=%d, length=%d", offset, metaHandle, metaLen)
152 0 : trailer, offset = trailer[n+m:], offset+n+m
153 0 :
154 0 : indexHandle, n := binary.Uvarint(trailer)
155 0 : indexLen, m := binary.Uvarint(trailer[n:])
156 0 : tpNode.Childf("%03d index: offset=%d, length=%d", offset, indexHandle, indexLen)
157 0 : trailer, offset = trailer[n+m:], offset+n+m
158 0 :
159 0 : trailing := 12
160 0 : if b.Name == "leveldb-footer" {
161 0 : trailing = 8
162 0 : }
163 :
164 0 : offset += len(trailer) - trailing
165 0 : trailer = trailer[len(trailer)-trailing:]
166 0 :
167 0 : if b.Name == "footer" {
168 0 : version := trailer[:4]
169 0 : tpNode.Childf("%03d version: %d", offset, binary.LittleEndian.Uint32(version))
170 0 : trailer, offset = trailer[4:], offset+4
171 0 : }
172 :
173 0 : magicNumber := trailer
174 0 : tpNode.Childf("%03d magic number: 0x%x", offset, magicNumber)
175 0 :
176 0 : continue
177 : }
178 :
179 : // Read the block and format it. Returns an error if we couldn't read the
180 : // block.
181 0 : err := func() error {
182 0 : var err error
183 0 : var h block.BufferHandle
184 0 : // Defer release of any block handle that will have been read.
185 0 : defer func() { h.Release() }()
186 :
187 0 : switch b.Name {
188 0 : case "data":
189 0 : h, err = r.readDataBlock(ctx, noEnv, noReadHandle, b.Handle)
190 0 : if err != nil {
191 0 : return err
192 0 : }
193 0 : if fmtKV == nil {
194 0 : formatting.formatDataBlock(tpNode, r, *b, h.BlockData(), nil)
195 0 : } else {
196 0 : var lastKey InternalKey
197 0 : formatting.formatDataBlock(tpNode, r, *b, h.BlockData(), func(key *base.InternalKey, value []byte) string {
198 0 : v := fmtKV(key, value)
199 0 : if base.InternalCompare(r.Compare, lastKey, *key) >= 0 {
200 0 : v += " WARNING: OUT OF ORDER KEYS!"
201 0 : }
202 0 : lastKey.Trailer = key.Trailer
203 0 : lastKey.UserKey = append(lastKey.UserKey[:0], key.UserKey...)
204 0 : return v
205 : })
206 : }
207 :
208 0 : case "range-del":
209 0 : h, err = r.readRangeDelBlock(ctx, noEnv, noReadHandle, b.Handle)
210 0 : if err != nil {
211 0 : return err
212 0 : }
213 : // TODO(jackson): colblk ignores fmtKV, because it doesn't
214 : // make sense in the context.
215 0 : formatting.formatKeyspanBlock(tpNode, r, *b, h.BlockData(), fmtKV)
216 :
217 0 : case "range-key":
218 0 : h, err = r.readRangeKeyBlock(ctx, noEnv, noReadHandle, b.Handle)
219 0 : if err != nil {
220 0 : return err
221 0 : }
222 : // TODO(jackson): colblk ignores fmtKV, because it doesn't
223 : // make sense in the context.
224 0 : formatting.formatKeyspanBlock(tpNode, r, *b, h.BlockData(), fmtKV)
225 :
226 0 : case "index", "top-index":
227 0 : h, err = r.readIndexBlock(ctx, noEnv, noReadHandle, b.Handle)
228 0 : if err != nil {
229 0 : return err
230 0 : }
231 0 : formatting.formatIndexBlock(tpNode, r, *b, h.BlockData())
232 :
233 0 : case "properties":
234 0 : h, err = r.readBlockInternal(ctx, noEnv, noReadHandle, b.Handle, noInitBlockMetadataFn)
235 0 : if err != nil {
236 0 : return err
237 0 : }
238 0 : iter, _ := rowblk.NewRawIter(r.Compare, h.BlockData())
239 0 : iter.Describe(tpNode, func(w io.Writer, key *base.InternalKey, value []byte, enc rowblk.KVEncoding) {
240 0 : fmt.Fprintf(w, "%05d %s (%d)", enc.Offset, key.UserKey, enc.Length)
241 0 : })
242 :
243 0 : case "meta-index":
244 0 : if b.Handle != r.metaindexBH {
245 0 : return base.AssertionFailedf("range-del block handle does not match rangeDelBH")
246 0 : }
247 0 : h, err = r.readMetaindexBlock(ctx, noEnv, noReadHandle)
248 0 : if err != nil {
249 0 : return err
250 0 : }
251 0 : iter, _ := rowblk.NewRawIter(r.Compare, h.BlockData())
252 0 : iter.Describe(tpNode, func(w io.Writer, key *base.InternalKey, value []byte, enc rowblk.KVEncoding) {
253 0 : var bh block.Handle
254 0 : var n int
255 0 : var vbih valblk.IndexHandle
256 0 : isValueBlocksIndexHandle := false
257 0 : if bytes.Equal(iter.Key().UserKey, []byte(metaValueIndexName)) {
258 0 : vbih, n, err = valblk.DecodeIndexHandle(value)
259 0 : bh = vbih.Handle
260 0 : isValueBlocksIndexHandle = true
261 0 : } else {
262 0 : bh, n = block.DecodeHandle(value)
263 0 : }
264 0 : if n == 0 || n != len(value) {
265 0 : fmt.Fprintf(w, "%04d [err: %s]\n", enc.Offset, err)
266 0 : return
267 0 : }
268 0 : var vbihStr string
269 0 : if isValueBlocksIndexHandle {
270 0 : vbihStr = fmt.Sprintf(" value-blocks-index-lengths: %d(num), %d(offset), %d(length)",
271 0 : vbih.BlockNumByteLength, vbih.BlockOffsetByteLength, vbih.BlockLengthByteLength)
272 0 : }
273 0 : fmt.Fprintf(w, "%04d %s block:%d/%d%s",
274 0 : uint64(enc.Offset), iter.Key().UserKey, bh.Offset, bh.Length, vbihStr)
275 : })
276 :
277 0 : case "value-block":
278 : // We don't peer into the value-block since it can't be interpreted
279 : // without the valueHandles.
280 0 : case "value-index":
281 : // We have already read the value-index to construct the list of
282 : // value-blocks, so no need to do it again.
283 : }
284 :
285 : // Format the trailer.
286 0 : trailer := make([]byte, block.TrailerLen)
287 0 : _ = r.readable.ReadAt(ctx, trailer, int64(b.Offset+b.Length))
288 0 : algo := block.CompressionIndicator(trailer[0])
289 0 : checksum := binary.LittleEndian.Uint32(trailer[1:])
290 0 : tpNode.Childf("trailer [compression=%s checksum=0x%04x]", algo, checksum)
291 0 : return nil
292 : }()
293 0 : if err != nil {
294 0 : tpNode.Childf("error reading block: %v", err)
295 0 : }
296 : }
297 0 : return tp.String()
298 : }
299 :
300 : type blockFormatting struct {
301 : formatIndexBlock formatBlockFunc
302 : formatDataBlock formatBlockFuncKV
303 : formatKeyspanBlock formatBlockFuncKV
304 : }
305 :
306 : type (
307 : formatBlockFunc func(treeprinter.Node, *Reader, NamedBlockHandle, []byte) error
308 : formatBlockFuncKV func(treeprinter.Node, *Reader, NamedBlockHandle, []byte, func(*base.InternalKey, []byte) string) error
309 : )
310 :
311 : var (
312 : rowblkFormatting = blockFormatting{
313 : formatIndexBlock: formatRowblkIndexBlock,
314 : formatDataBlock: formatRowblkDataBlock,
315 : formatKeyspanBlock: formatRowblkDataBlock,
316 : }
317 : colblkFormatting = blockFormatting{
318 : formatIndexBlock: formatColblkIndexBlock,
319 : formatDataBlock: formatColblkDataBlock,
320 : formatKeyspanBlock: formatColblkKeyspanBlock,
321 : }
322 : )
323 :
324 0 : func formatColblkIndexBlock(tp treeprinter.Node, r *Reader, b NamedBlockHandle, data []byte) error {
325 0 : var iter colblk.IndexIter
326 0 : if err := iter.Init(r.Comparer, data, NoTransforms); err != nil {
327 0 : return err
328 0 : }
329 0 : defer iter.Close()
330 0 : i := 0
331 0 : for v := iter.First(); v; v = iter.Next() {
332 0 : bh, err := iter.BlockHandleWithProperties()
333 0 : if err != nil {
334 0 : return err
335 0 : }
336 0 : tp.Childf("%05d block:%d/%d\n", i, bh.Offset, bh.Length)
337 0 : i++
338 : }
339 0 : return nil
340 : }
341 :
342 : func formatColblkDataBlock(
343 : tp treeprinter.Node,
344 : r *Reader,
345 : b NamedBlockHandle,
346 : data []byte,
347 : fmtKV func(key *base.InternalKey, value []byte) string,
348 0 : ) error {
349 0 : var decoder colblk.DataBlockDecoder
350 0 : decoder.Init(r.keySchema, data)
351 0 : f := binfmt.New(data)
352 0 : decoder.Describe(f, tp)
353 0 :
354 0 : if fmtKV != nil {
355 0 : var iter colblk.DataBlockIter
356 0 : iter.InitOnce(r.keySchema, r.Comparer, describingLazyValueHandler{})
357 0 : if err := iter.Init(&decoder, block.IterTransforms{}); err != nil {
358 0 : return err
359 0 : }
360 0 : defer iter.Close()
361 0 : for kv := iter.First(); kv != nil; kv = iter.Next() {
362 0 : tp.Child(fmtKV(&kv.K, kv.V.ValueOrHandle))
363 0 : }
364 : }
365 0 : return nil
366 : }
367 :
368 : // describingLazyValueHandler is a block.GetLazyValueForPrefixAndValueHandler
369 : // that replaces a value handle with an in-place value describing the handle.
370 : type describingLazyValueHandler struct{}
371 :
372 : // Assert that debugLazyValueHandler implements the
373 : // block.GetLazyValueForPrefixAndValueHandler interface.
374 : var _ block.GetLazyValueForPrefixAndValueHandler = describingLazyValueHandler{}
375 :
376 : func (describingLazyValueHandler) GetLazyValueForPrefixAndValueHandle(
377 : handle []byte,
378 0 : ) base.LazyValue {
379 0 : vh := valblk.DecodeHandle(handle[1:])
380 0 : return base.LazyValue{ValueOrHandle: []byte(fmt.Sprintf("value handle %+v", vh))}
381 0 : }
382 :
383 : func formatColblkKeyspanBlock(
384 : tp treeprinter.Node,
385 : r *Reader,
386 : b NamedBlockHandle,
387 : data []byte,
388 : _ func(*base.InternalKey, []byte) string,
389 0 : ) error {
390 0 : var decoder colblk.KeyspanDecoder
391 0 : decoder.Init(data)
392 0 : f := binfmt.New(data)
393 0 : decoder.Describe(f, tp)
394 0 : return nil
395 0 : }
396 :
397 0 : func formatRowblkIndexBlock(tp treeprinter.Node, r *Reader, b NamedBlockHandle, data []byte) error {
398 0 : iter, err := rowblk.NewIter(r.Compare, r.Comparer.ComparePointSuffixes, r.Split, data, NoTransforms)
399 0 : if err != nil {
400 0 : return err
401 0 : }
402 0 : iter.Describe(tp, func(w io.Writer, key *base.InternalKey, value []byte, enc rowblk.KVEncoding) {
403 0 : bh, err := block.DecodeHandleWithProperties(value)
404 0 : if err != nil {
405 0 : fmt.Fprintf(w, "%05d [err: %s]\n", enc.Offset, err)
406 0 : return
407 0 : }
408 0 : fmt.Fprintf(w, "%05d block:%d/%d", enc.Offset, bh.Offset, bh.Length)
409 0 : if enc.IsRestart {
410 0 : fmt.Fprintf(w, " [restart]")
411 0 : }
412 : })
413 0 : return nil
414 : }
415 :
416 : func formatRowblkDataBlock(
417 : tp treeprinter.Node,
418 : r *Reader,
419 : b NamedBlockHandle,
420 : data []byte,
421 : fmtRecord func(key *base.InternalKey, value []byte) string,
422 0 : ) error {
423 0 : iter, err := rowblk.NewIter(r.Compare, r.Comparer.ComparePointSuffixes, r.Split, data, NoTransforms)
424 0 : if err != nil {
425 0 : return err
426 0 : }
427 0 : iter.Describe(tp, func(w io.Writer, key *base.InternalKey, value []byte, enc rowblk.KVEncoding) {
428 0 : // The format of the numbers in the record line is:
429 0 : //
430 0 : // (<total> = <length> [<shared>] + <unshared> + <value>)
431 0 : //
432 0 : // <total> is the total number of bytes for the record.
433 0 : // <length> is the size of the 3 varint encoded integers for <shared>,
434 0 : // <unshared>, and <value>.
435 0 : // <shared> is the number of key bytes shared with the previous key.
436 0 : // <unshared> is the number of unshared key bytes.
437 0 : // <value> is the number of value bytes.
438 0 : fmt.Fprintf(w, "%05d record (%d = %d [%d] + %d + %d)",
439 0 : uint64(enc.Offset), enc.Length,
440 0 : enc.Length-int32(enc.KeyUnshared+enc.ValueLen), enc.KeyShared, enc.KeyUnshared, enc.ValueLen)
441 0 : if enc.IsRestart {
442 0 : fmt.Fprint(w, " [restart]")
443 0 : }
444 0 : if fmtRecord != nil {
445 0 : if r.tableFormat < TableFormatPebblev3 || key.Kind() != InternalKeyKindSet {
446 0 : fmt.Fprintf(w, "\n %s", fmtRecord(key, value))
447 0 : } else if !block.ValuePrefix(value[0]).IsValueHandle() {
448 0 : fmt.Fprintf(w, "\n %s", fmtRecord(key, value[1:]))
449 0 : } else {
450 0 : vh := valblk.DecodeHandle(value[1:])
451 0 : fmt.Fprintf(w, "\n %s", fmtRecord(key, []byte(fmt.Sprintf("value handle %+v", vh))))
452 0 : }
453 : }
454 : })
455 0 : return nil
456 : }
457 :
458 0 : func decodeLayout(comparer *base.Comparer, data []byte) (Layout, error) {
459 0 : foot, err := parseFooter(data, 0, int64(len(data)))
460 0 : if err != nil {
461 0 : return Layout{}, err
462 0 : }
463 0 : decompressedMeta, err := decompressInMemory(data, foot.metaindexBH)
464 0 : if err != nil {
465 0 : return Layout{}, errors.Wrap(err, "decompressing metaindex")
466 0 : }
467 0 : meta, vbih, err := decodeMetaindex(decompressedMeta)
468 0 : if err != nil {
469 0 : return Layout{}, err
470 0 : }
471 0 : layout := Layout{
472 0 : MetaIndex: foot.metaindexBH,
473 0 : Properties: meta[metaPropertiesName],
474 0 : RangeDel: meta[metaRangeDelV2Name],
475 0 : RangeKey: meta[metaRangeKeyName],
476 0 : ValueIndex: vbih.Handle,
477 0 : Footer: foot.footerBH,
478 0 : Format: foot.format,
479 0 : }
480 0 : var props Properties
481 0 : decompressedProps, err := decompressInMemory(data, layout.Properties)
482 0 : if err != nil {
483 0 : return Layout{}, errors.Wrap(err, "decompressing properties")
484 0 : }
485 0 : if err := props.load(decompressedProps, map[string]struct{}{}); err != nil {
486 0 : return Layout{}, err
487 0 : }
488 :
489 0 : if props.IndexType == twoLevelIndex {
490 0 : decompressed, err := decompressInMemory(data, foot.indexBH)
491 0 : if err != nil {
492 0 : return Layout{}, errors.Wrap(err, "decompressing two-level index")
493 0 : }
494 0 : layout.TopIndex = foot.indexBH
495 0 : topLevelIter, err := newIndexIter(foot.format, comparer, decompressed)
496 0 : if err != nil {
497 0 : return Layout{}, err
498 0 : }
499 0 : err = forEachIndexEntry(topLevelIter, func(bhp block.HandleWithProperties) {
500 0 : layout.Index = append(layout.Index, bhp.Handle)
501 0 : })
502 0 : if err != nil {
503 0 : return Layout{}, err
504 0 : }
505 0 : } else {
506 0 : layout.Index = append(layout.Index, foot.indexBH)
507 0 : }
508 0 : for _, indexBH := range layout.Index {
509 0 : decompressed, err := decompressInMemory(data, indexBH)
510 0 : if err != nil {
511 0 : return Layout{}, errors.Wrap(err, "decompressing index block")
512 0 : }
513 0 : indexIter, err := newIndexIter(foot.format, comparer, decompressed)
514 0 : if err != nil {
515 0 : return Layout{}, err
516 0 : }
517 0 : err = forEachIndexEntry(indexIter, func(bhp block.HandleWithProperties) {
518 0 : layout.Data = append(layout.Data, bhp)
519 0 : })
520 0 : if err != nil {
521 0 : return Layout{}, err
522 0 : }
523 : }
524 :
525 0 : if layout.ValueIndex.Length > 0 {
526 0 : vbiBlock, err := decompressInMemory(data, layout.ValueIndex)
527 0 : if err != nil {
528 0 : return Layout{}, errors.Wrap(err, "decompressing value index")
529 0 : }
530 0 : layout.ValueBlock, err = valblk.DecodeIndex(vbiBlock, vbih)
531 0 : if err != nil {
532 0 : return Layout{}, err
533 0 : }
534 : }
535 :
536 0 : return layout, nil
537 : }
538 :
539 0 : func decompressInMemory(data []byte, bh block.Handle) ([]byte, error) {
540 0 : typ := block.CompressionIndicator(data[bh.Offset+bh.Length])
541 0 : var decompressed []byte
542 0 : if typ == block.NoCompressionIndicator {
543 0 : return data[bh.Offset : bh.Offset+bh.Length], nil
544 0 : }
545 : // Decode the length of the decompressed value.
546 0 : decodedLen, prefixLen, err := block.DecompressedLen(typ, data[bh.Offset:bh.Offset+bh.Length])
547 0 : if err != nil {
548 0 : return nil, err
549 0 : }
550 0 : decompressed = make([]byte, decodedLen)
551 0 : if err := block.DecompressInto(typ, data[int(bh.Offset)+prefixLen:bh.Offset+bh.Length], decompressed); err != nil {
552 0 : return nil, err
553 0 : }
554 0 : return decompressed, nil
555 : }
556 :
557 : func newIndexIter(
558 : tableFormat TableFormat, comparer *base.Comparer, data []byte,
559 0 : ) (block.IndexBlockIterator, error) {
560 0 : var iter block.IndexBlockIterator
561 0 : var err error
562 0 : if tableFormat <= TableFormatPebblev4 {
563 0 : iter = new(rowblk.IndexIter)
564 0 : err = iter.Init(comparer, data, block.NoTransforms)
565 0 : } else {
566 0 : iter = new(colblk.IndexIter)
567 0 : err = iter.Init(comparer, data, block.NoTransforms)
568 0 : }
569 0 : if err != nil {
570 0 : return nil, err
571 0 : }
572 0 : return iter, nil
573 : }
574 :
575 : func forEachIndexEntry(
576 : indexIter block.IndexBlockIterator, fn func(block.HandleWithProperties),
577 0 : ) error {
578 0 : for v := indexIter.First(); v; v = indexIter.Next() {
579 0 : bhp, err := indexIter.BlockHandleWithProperties()
580 0 : if err != nil {
581 0 : return err
582 0 : }
583 0 : fn(bhp)
584 : }
585 0 : return indexIter.Close()
586 : }
587 :
588 : func decodeMetaindex(
589 : data []byte,
590 1 : ) (meta map[string]block.Handle, vbih valblk.IndexHandle, err error) {
591 1 : i, err := rowblk.NewRawIter(bytes.Compare, data)
592 1 : if err != nil {
593 0 : return nil, valblk.IndexHandle{}, err
594 0 : }
595 1 : defer func() { err = firstError(err, i.Close()) }()
596 :
597 1 : meta = map[string]block.Handle{}
598 1 : for valid := i.First(); valid; valid = i.Next() {
599 1 : value := i.Value()
600 1 : if bytes.Equal(i.Key().UserKey, []byte(metaValueIndexName)) {
601 1 : var n int
602 1 : vbih, n, err = valblk.DecodeIndexHandle(i.Value())
603 1 : if err != nil {
604 0 : return nil, vbih, err
605 0 : }
606 1 : if n == 0 || n != len(value) {
607 0 : return nil, vbih, base.CorruptionErrorf("pebble/table: invalid table (bad value blocks index handle)")
608 0 : }
609 1 : } else {
610 1 : bh, n := block.DecodeHandle(value)
611 1 : if n == 0 || n != len(value) {
612 0 : return nil, vbih, base.CorruptionErrorf("pebble/table: invalid table (bad block handle)")
613 0 : }
614 1 : meta[string(i.Key().UserKey)] = bh
615 : }
616 : }
617 1 : return meta, vbih, nil
618 : }
619 :
620 : // layoutWriter writes the structure of an sstable to durable storage. It
621 : // accepts serialized blocks, writes them to storage and returns a block handle
622 : // describing the offset and length of the block.
623 : type layoutWriter struct {
624 : writable objstorage.Writable
625 :
626 : // cacheOpts are used to remove blocks written to the sstable from the cache,
627 : // providing a defense in depth against bugs which cause cache collisions.
628 : cacheOpts sstableinternal.CacheOptions
629 :
630 : // options copied from WriterOptions
631 : tableFormat TableFormat
632 : compression block.Compression
633 : checksumType block.ChecksumType
634 :
635 : // offset tracks the current write offset within the writable.
636 : offset uint64
637 : // lastIndexBlockHandle holds the handle to the most recently-written index
638 : // block. It's updated by writeIndexBlock. When writing sstables with a
639 : // single-level index, this field will be updated once. When writing
640 : // sstables with a two-level index, the last update will set the two-level
641 : // index.
642 : lastIndexBlockHandle block.Handle
643 : handles []metaIndexHandle
644 : handlesBuf bytealloc.A
645 : tmp [blockHandleLikelyMaxLen]byte
646 : buf blockBuf
647 : }
648 :
649 1 : func makeLayoutWriter(w objstorage.Writable, opts WriterOptions) layoutWriter {
650 1 : return layoutWriter{
651 1 : writable: w,
652 1 : cacheOpts: opts.internal.CacheOpts,
653 1 : tableFormat: opts.TableFormat,
654 1 : compression: opts.Compression,
655 1 : checksumType: opts.Checksum,
656 1 : buf: blockBuf{
657 1 : checksummer: block.Checksummer{Type: opts.Checksum},
658 1 : },
659 1 : }
660 1 : }
661 :
662 : type metaIndexHandle struct {
663 : key string
664 : encodedBlockHandle []byte
665 : }
666 :
667 : // Abort aborts writing the table, aborting the underlying writable too. Abort
668 : // is idempotent.
669 1 : func (w *layoutWriter) Abort() {
670 1 : if w.writable != nil {
671 0 : w.writable.Abort()
672 0 : w.writable = nil
673 0 : }
674 : }
675 :
676 : // WriteDataBlock constructs a trailer for the provided data block and writes
677 : // the block and trailer to the writer. It returns the block's handle. It can
678 : // mangle b.
679 1 : func (w *layoutWriter) WriteDataBlock(b []byte, buf *blockBuf) (block.Handle, error) {
680 1 : return w.writeBlock(b, w.compression, buf)
681 1 : }
682 :
683 : // WritePrecompressedDataBlock writes a pre-compressed data block and its
684 : // pre-computed trailer to the writer, returning its block handle. It can mangle
685 : // the block data.
686 1 : func (w *layoutWriter) WritePrecompressedDataBlock(blk block.PhysicalBlock) (block.Handle, error) {
687 1 : return w.writePrecompressedBlock(blk)
688 1 : }
689 :
690 : // WriteIndexBlock constructs a trailer for the provided index (first or
691 : // second-level) and writes the block and trailer to the writer. It remembers
692 : // the last-written index block's handle and adds it to the file's meta index
693 : // when the writer is finished.
694 : //
695 : // WriteIndexBlock can mangle b.
696 1 : func (w *layoutWriter) WriteIndexBlock(b []byte) (block.Handle, error) {
697 1 : h, err := w.writeBlock(b, w.compression, &w.buf)
698 1 : if err == nil {
699 1 : w.lastIndexBlockHandle = h
700 1 : }
701 1 : return h, err
702 : }
703 :
704 : // WriteFilterBlock finishes the provided filter, constructs a trailer and
705 : // writes the block and trailer to the writer. It automatically adds the filter
706 : // block to the file's meta index when the writer is finished.
707 1 : func (w *layoutWriter) WriteFilterBlock(f filterWriter) (bh block.Handle, err error) {
708 1 : b, err := f.finish()
709 1 : if err != nil {
710 0 : return block.Handle{}, err
711 0 : }
712 1 : return w.writeNamedBlock(b, f.metaName())
713 : }
714 :
715 : // WritePropertiesBlock constructs a trailer for the provided properties block
716 : // and writes the block and trailer to the writer. It automatically adds the
717 : // properties block to the file's meta index when the writer is finished.
718 : //
719 : // WritePropertiesBlock can mangle b.
720 1 : func (w *layoutWriter) WritePropertiesBlock(b []byte) (block.Handle, error) {
721 1 : return w.writeNamedBlock(b, metaPropertiesName)
722 1 : }
723 :
724 : // WriteRangeKeyBlock constructs a trailer for the provided range key block and
725 : // writes the block and trailer to the writer. It automatically adds the range
726 : // key block to the file's meta index when the writer is finished.
727 : //
728 : // WriteRangeKeyBlock can mangle the block data.
729 1 : func (w *layoutWriter) WriteRangeKeyBlock(b []byte) (block.Handle, error) {
730 1 : return w.writeNamedBlock(b, metaRangeKeyName)
731 1 : }
732 :
733 : // WriteRangeDeletionBlock constructs a trailer for the provided range deletion
734 : // block and writes the block and trailer to the writer. It automatically adds
735 : // the range deletion block to the file's meta index when the writer is
736 : // finished.
737 : //
738 : // WriteRangeDeletionBlock can mangle the block data.
739 1 : func (w *layoutWriter) WriteRangeDeletionBlock(b []byte) (block.Handle, error) {
740 1 : return w.writeNamedBlock(b, metaRangeDelV2Name)
741 1 : }
742 :
743 : // writeNamedBlock can mangle the block data.
744 1 : func (w *layoutWriter) writeNamedBlock(b []byte, name string) (bh block.Handle, err error) {
745 1 : bh, err = w.writeBlock(b, block.NoCompression, &w.buf)
746 1 : if err == nil {
747 1 : w.recordToMetaindex(name, bh)
748 1 : }
749 1 : return bh, err
750 : }
751 :
752 : // WriteValueBlock writes a pre-finished value block (with the trailer) to the
753 : // writer. It can mangle the block data.
754 1 : func (w *layoutWriter) WriteValueBlock(blk block.PhysicalBlock) (block.Handle, error) {
755 1 : return w.writePrecompressedBlock(blk)
756 1 : }
757 :
758 : // WriteValueIndexBlock writes a value index block and adds it to the meta
759 : // index. It can mangle the block data.
760 : func (w *layoutWriter) WriteValueIndexBlock(
761 : blk block.PhysicalBlock, vbih valblk.IndexHandle,
762 1 : ) (block.Handle, error) {
763 1 : h, err := w.writePrecompressedBlock(blk)
764 1 : if err != nil {
765 0 : return block.Handle{}, err
766 0 : }
767 1 : n := valblk.EncodeIndexHandle(w.tmp[:], vbih)
768 1 : w.recordToMetaindexRaw(metaValueIndexName, w.tmp[:n])
769 1 : return h, nil
770 : }
771 :
772 : // writeBlock checksums, compresses, and writes out a block. It can mangle b.
773 : func (w *layoutWriter) writeBlock(
774 : b []byte, compression block.Compression, buf *blockBuf,
775 1 : ) (block.Handle, error) {
776 1 : return w.writePrecompressedBlock(block.CompressAndChecksum(
777 1 : &buf.dataBuf, b, compression, &buf.checksummer))
778 1 : }
779 :
780 : // writePrecompressedBlock writes a pre-compressed block and its
781 : // pre-computed trailer to the writer, returning it's block handle.
782 : //
783 : // writePrecompressedBlock might mangle the block data.
784 1 : func (w *layoutWriter) writePrecompressedBlock(blk block.PhysicalBlock) (block.Handle, error) {
785 1 : w.clearFromCache(w.offset)
786 1 : // Write the bytes to the file. This call can mangle the block data.
787 1 : n, err := blk.WriteTo(w.writable)
788 1 : if err != nil {
789 0 : return block.Handle{}, err
790 0 : }
791 1 : bh := block.Handle{Offset: w.offset, Length: uint64(blk.LengthWithoutTrailer())}
792 1 : w.offset += uint64(n)
793 1 : return bh, nil
794 : }
795 :
796 : // Write implements io.Writer (with the caveat that it can mangle the block
797 : // data). This is analogous to writePrecompressedBlock for blocks that already
798 : // incorporate the trailer, and don't need the callee to return a BlockHandle.
799 0 : func (w *layoutWriter) Write(blockWithTrailer []byte) (n int, err error) {
800 0 : offset := w.offset
801 0 : w.clearFromCache(offset)
802 0 : w.offset += uint64(len(blockWithTrailer))
803 0 : // This call can mangle blockWithTrailer.
804 0 : if err := w.writable.Write(blockWithTrailer); err != nil {
805 0 : return 0, err
806 0 : }
807 0 : return len(blockWithTrailer), nil
808 : }
809 :
810 : // clearFromCache removes the block at the provided offset from the cache. This provides defense in
811 : // depth against bugs which cause cache collisions.
812 1 : func (w *layoutWriter) clearFromCache(offset uint64) {
813 1 : if w.cacheOpts.Cache != nil {
814 1 : // TODO(peter): Alternatively, we could add the uncompressed value to the
815 1 : // cache.
816 1 : w.cacheOpts.Cache.Delete(w.cacheOpts.CacheID, w.cacheOpts.FileNum, offset)
817 1 : }
818 : }
819 :
820 1 : func (w *layoutWriter) recordToMetaindex(key string, h block.Handle) {
821 1 : n := h.EncodeVarints(w.tmp[:])
822 1 : w.recordToMetaindexRaw(key, w.tmp[:n])
823 1 : }
824 :
825 1 : func (w *layoutWriter) recordToMetaindexRaw(key string, h []byte) {
826 1 : var encodedHandle []byte
827 1 : w.handlesBuf, encodedHandle = w.handlesBuf.Alloc(len(h))
828 1 : copy(encodedHandle, h)
829 1 : w.handles = append(w.handles, metaIndexHandle{key: key, encodedBlockHandle: encodedHandle})
830 1 : }
831 :
832 1 : func (w *layoutWriter) IsFinished() bool { return w.writable == nil }
833 :
834 : // Finish serializes the sstable, writing out the meta index block and sstable
835 : // footer and closing the file. It returns the total size of the resulting
836 : // ssatable.
837 1 : func (w *layoutWriter) Finish() (size uint64, err error) {
838 1 : // Sort the meta index handles by key and write the meta index block.
839 1 : slices.SortFunc(w.handles, func(a, b metaIndexHandle) int {
840 1 : return cmp.Compare(a.key, b.key)
841 1 : })
842 1 : bw := rowblk.Writer{RestartInterval: 1}
843 1 : for _, h := range w.handles {
844 1 : bw.AddRaw(unsafe.Slice(unsafe.StringData(h.key), len(h.key)), h.encodedBlockHandle)
845 1 : }
846 1 : metaIndexHandle, err := w.writeBlock(bw.Finish(), block.NoCompression, &w.buf)
847 1 : if err != nil {
848 0 : return 0, err
849 0 : }
850 :
851 : // Write the table footer.
852 1 : footer := footer{
853 1 : format: w.tableFormat,
854 1 : checksum: w.checksumType,
855 1 : metaindexBH: metaIndexHandle,
856 1 : indexBH: w.lastIndexBlockHandle,
857 1 : }
858 1 : encodedFooter := footer.encode(w.tmp[:])
859 1 : if err := w.writable.Write(encodedFooter); err != nil {
860 0 : return 0, err
861 0 : }
862 1 : w.offset += uint64(len(encodedFooter))
863 1 :
864 1 : err = w.writable.Finish()
865 1 : w.writable = nil
866 1 : return w.offset, err
867 : }
|