Line data Source code
1 : // Copyright 2011 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package sstable
6 :
7 : import (
8 : "bytes"
9 : "cmp"
10 : "context"
11 : "encoding/binary"
12 : "fmt"
13 : "io"
14 : "slices"
15 : "unsafe"
16 :
17 : "github.com/cockroachdb/errors"
18 : "github.com/cockroachdb/pebble/internal/base"
19 : "github.com/cockroachdb/pebble/internal/binfmt"
20 : "github.com/cockroachdb/pebble/internal/bytealloc"
21 : "github.com/cockroachdb/pebble/internal/sstableinternal"
22 : "github.com/cockroachdb/pebble/internal/treeprinter"
23 : "github.com/cockroachdb/pebble/objstorage"
24 : "github.com/cockroachdb/pebble/sstable/block"
25 : "github.com/cockroachdb/pebble/sstable/colblk"
26 : "github.com/cockroachdb/pebble/sstable/rowblk"
27 : "github.com/cockroachdb/pebble/sstable/valblk"
28 : )
29 :
30 : // Layout describes the block organization of an sstable.
31 : type Layout struct {
32 : // NOTE: changes to fields in this struct should also be reflected in
33 : // ValidateBlockChecksums, which validates a static list of BlockHandles
34 : // referenced in this struct.
35 :
36 : Data []block.HandleWithProperties
37 : Index []block.Handle
38 : TopIndex block.Handle
39 : Filter []NamedBlockHandle
40 : RangeDel block.Handle
41 : RangeKey block.Handle
42 : ValueBlock []block.Handle
43 : ValueIndex block.Handle
44 : Properties block.Handle
45 : MetaIndex block.Handle
46 : Footer block.Handle
47 : Format TableFormat
48 : }
49 :
50 : // NamedBlockHandle holds a block.Handle and corresponding name.
51 : type NamedBlockHandle struct {
52 : block.Handle
53 : Name string
54 : }
55 :
56 : // FilterByName retrieves the block handle of the named filter, if it exists.
57 : // The provided the name should be the name as it appears in the metaindex
58 : // block.
59 1 : func (l *Layout) FilterByName(name string) (block.Handle, bool) {
60 1 : for i := range l.Filter {
61 1 : if l.Filter[i].Name == name {
62 1 : return l.Filter[i].Handle, true
63 1 : }
64 : }
65 1 : return block.Handle{}, false
66 : }
67 :
68 1 : func (l *Layout) orderedBlocks() []NamedBlockHandle {
69 1 : var blocks []NamedBlockHandle
70 1 : for i := range l.Data {
71 1 : blocks = append(blocks, NamedBlockHandle{l.Data[i].Handle, "data"})
72 1 : }
73 1 : for i := range l.Index {
74 1 : blocks = append(blocks, NamedBlockHandle{l.Index[i], "index"})
75 1 : }
76 1 : if l.TopIndex.Length != 0 {
77 1 : blocks = append(blocks, NamedBlockHandle{l.TopIndex, "top-index"})
78 1 : }
79 1 : blocks = append(blocks, l.Filter...)
80 1 : if l.RangeDel.Length != 0 {
81 1 : blocks = append(blocks, NamedBlockHandle{l.RangeDel, "range-del"})
82 1 : }
83 1 : if l.RangeKey.Length != 0 {
84 1 : blocks = append(blocks, NamedBlockHandle{l.RangeKey, "range-key"})
85 1 : }
86 1 : for i := range l.ValueBlock {
87 1 : blocks = append(blocks, NamedBlockHandle{l.ValueBlock[i], "value-block"})
88 1 : }
89 1 : if l.ValueIndex.Length != 0 {
90 1 : blocks = append(blocks, NamedBlockHandle{l.ValueIndex, "value-index"})
91 1 : }
92 1 : if l.Properties.Length != 0 {
93 1 : blocks = append(blocks, NamedBlockHandle{l.Properties, "properties"})
94 1 : }
95 1 : if l.MetaIndex.Length != 0 {
96 1 : blocks = append(blocks, NamedBlockHandle{l.MetaIndex, "meta-index"})
97 1 : }
98 1 : if l.Footer.Length != 0 {
99 1 : if l.Footer.Length == levelDBFooterLen {
100 1 : blocks = append(blocks, NamedBlockHandle{l.Footer, "leveldb-footer"})
101 1 : } else {
102 1 : blocks = append(blocks, NamedBlockHandle{l.Footer, "footer"})
103 1 : }
104 : }
105 1 : slices.SortFunc(blocks, func(a, b NamedBlockHandle) int {
106 1 : return cmp.Compare(a.Offset, b.Offset)
107 1 : })
108 1 : return blocks
109 : }
110 :
111 : // Describe returns a description of the layout. If the verbose parameter is
112 : // true, details of the structure of each block are returned as well.
113 : // If verbose is true and fmtKV is non-nil, the output includes the KVs (as formatted by this function).
114 : func (l *Layout) Describe(
115 : verbose bool, r *Reader, fmtKV func(key *base.InternalKey, value []byte) string,
116 1 : ) string {
117 1 : ctx := context.TODO()
118 1 :
119 1 : blocks := l.orderedBlocks()
120 1 : formatting := rowblkFormatting
121 1 : if l.Format.BlockColumnar() {
122 1 : formatting = colblkFormatting
123 1 : }
124 :
125 1 : tp := treeprinter.New()
126 1 : root := tp.Child("sstable")
127 1 :
128 1 : for i := range blocks {
129 1 : b := &blocks[i]
130 1 : tpNode := root.Childf("%s offset: %d length: %d", b.Name, b.Offset, b.Length)
131 1 :
132 1 : if !verbose {
133 1 : continue
134 : }
135 1 : if b.Name == "filter" {
136 0 : continue
137 : }
138 :
139 1 : if b.Name == "footer" || b.Name == "leveldb-footer" {
140 1 : trailer, offset := make([]byte, b.Length), 0
141 1 : _ = r.readable.ReadAt(ctx, trailer, int64(b.Offset))
142 1 :
143 1 : if b.Name == "footer" {
144 1 : checksumType := block.ChecksumType(trailer[0])
145 1 : tpNode.Childf("%03d checksum type: %s", offset, checksumType)
146 1 : trailer, offset = trailer[1:], offset+1
147 1 : }
148 :
149 1 : metaHandle, n := binary.Uvarint(trailer)
150 1 : metaLen, m := binary.Uvarint(trailer[n:])
151 1 : tpNode.Childf("%03d meta: offset=%d, length=%d", offset, metaHandle, metaLen)
152 1 : trailer, offset = trailer[n+m:], offset+n+m
153 1 :
154 1 : indexHandle, n := binary.Uvarint(trailer)
155 1 : indexLen, m := binary.Uvarint(trailer[n:])
156 1 : tpNode.Childf("%03d index: offset=%d, length=%d", offset, indexHandle, indexLen)
157 1 : trailer, offset = trailer[n+m:], offset+n+m
158 1 :
159 1 : trailing := 12
160 1 : if b.Name == "leveldb-footer" {
161 0 : trailing = 8
162 0 : }
163 :
164 1 : offset += len(trailer) - trailing
165 1 : trailer = trailer[len(trailer)-trailing:]
166 1 :
167 1 : if b.Name == "footer" {
168 1 : version := trailer[:4]
169 1 : tpNode.Childf("%03d version: %d", offset, binary.LittleEndian.Uint32(version))
170 1 : trailer, offset = trailer[4:], offset+4
171 1 : }
172 :
173 1 : magicNumber := trailer
174 1 : tpNode.Childf("%03d magic number: 0x%x", offset, magicNumber)
175 1 :
176 1 : continue
177 : }
178 :
179 : // Read the block and format it. Returns an error if we couldn't read the
180 : // block.
181 1 : err := func() error {
182 1 : var err error
183 1 : var h block.BufferHandle
184 1 : // Defer release of any block handle that will have been read.
185 1 : defer func() { h.Release() }()
186 :
187 1 : switch b.Name {
188 1 : case "data":
189 1 : h, err = r.readDataBlock(ctx, noEnv, noReadHandle, b.Handle)
190 1 : if err != nil {
191 0 : return err
192 0 : }
193 1 : if fmtKV == nil {
194 1 : formatting.formatDataBlock(tpNode, r, *b, h.BlockData(), nil)
195 1 : } else {
196 1 : var lastKey InternalKey
197 1 : formatting.formatDataBlock(tpNode, r, *b, h.BlockData(), func(key *base.InternalKey, value []byte) string {
198 1 : v := fmtKV(key, value)
199 1 : if base.InternalCompare(r.Compare, lastKey, *key) >= 0 {
200 1 : v += " WARNING: OUT OF ORDER KEYS!"
201 1 : }
202 1 : lastKey.Trailer = key.Trailer
203 1 : lastKey.UserKey = append(lastKey.UserKey[:0], key.UserKey...)
204 1 : return v
205 : })
206 : }
207 :
208 1 : case "range-del":
209 1 : h, err = r.readRangeDelBlock(ctx, noEnv, noReadHandle, b.Handle)
210 1 : if err != nil {
211 0 : return err
212 0 : }
213 : // TODO(jackson): colblk ignores fmtKV, because it doesn't
214 : // make sense in the context.
215 1 : formatting.formatKeyspanBlock(tpNode, r, *b, h.BlockData(), fmtKV)
216 :
217 1 : case "range-key":
218 1 : h, err = r.readRangeKeyBlock(ctx, noEnv, noReadHandle, b.Handle)
219 1 : if err != nil {
220 0 : return err
221 0 : }
222 : // TODO(jackson): colblk ignores fmtKV, because it doesn't
223 : // make sense in the context.
224 1 : formatting.formatKeyspanBlock(tpNode, r, *b, h.BlockData(), fmtKV)
225 :
226 1 : case "index", "top-index":
227 1 : h, err = r.readIndexBlock(ctx, noEnv, noReadHandle, b.Handle)
228 1 : if err != nil {
229 0 : return err
230 0 : }
231 1 : formatting.formatIndexBlock(tpNode, r, *b, h.BlockData())
232 :
233 1 : case "properties":
234 1 : h, err = r.readBlockInternal(ctx, noEnv, noReadHandle, b.Handle, noInitBlockMetadataFn)
235 1 : if err != nil {
236 0 : return err
237 0 : }
238 1 : iter, _ := rowblk.NewRawIter(r.Compare, h.BlockData())
239 1 : iter.Describe(tpNode, func(w io.Writer, key *base.InternalKey, value []byte, enc rowblk.KVEncoding) {
240 1 : fmt.Fprintf(w, "%05d %s (%d)", enc.Offset, key.UserKey, enc.Length)
241 1 : })
242 :
243 1 : case "meta-index":
244 1 : if b.Handle != r.metaindexBH {
245 0 : return base.AssertionFailedf("range-del block handle does not match rangeDelBH")
246 0 : }
247 1 : h, err = r.readMetaindexBlock(ctx, noEnv, noReadHandle)
248 1 : if err != nil {
249 0 : return err
250 0 : }
251 1 : iter, _ := rowblk.NewRawIter(r.Compare, h.BlockData())
252 1 : iter.Describe(tpNode, func(w io.Writer, key *base.InternalKey, value []byte, enc rowblk.KVEncoding) {
253 1 : var bh block.Handle
254 1 : var n int
255 1 : var vbih valblk.IndexHandle
256 1 : isValueBlocksIndexHandle := false
257 1 : if bytes.Equal(iter.Key().UserKey, []byte(metaValueIndexName)) {
258 1 : vbih, n, err = valblk.DecodeIndexHandle(value)
259 1 : bh = vbih.Handle
260 1 : isValueBlocksIndexHandle = true
261 1 : } else {
262 1 : bh, n = block.DecodeHandle(value)
263 1 : }
264 1 : if n == 0 || n != len(value) {
265 0 : fmt.Fprintf(w, "%04d [err: %s]\n", enc.Offset, err)
266 0 : return
267 0 : }
268 1 : var vbihStr string
269 1 : if isValueBlocksIndexHandle {
270 1 : vbihStr = fmt.Sprintf(" value-blocks-index-lengths: %d(num), %d(offset), %d(length)",
271 1 : vbih.BlockNumByteLength, vbih.BlockOffsetByteLength, vbih.BlockLengthByteLength)
272 1 : }
273 1 : fmt.Fprintf(w, "%04d %s block:%d/%d%s",
274 1 : uint64(enc.Offset), iter.Key().UserKey, bh.Offset, bh.Length, vbihStr)
275 : })
276 :
277 1 : case "value-block":
278 : // We don't peer into the value-block since it can't be interpreted
279 : // without the valueHandles.
280 1 : case "value-index":
281 : // We have already read the value-index to construct the list of
282 : // value-blocks, so no need to do it again.
283 : }
284 :
285 : // Format the trailer.
286 1 : trailer := make([]byte, block.TrailerLen)
287 1 : _ = r.readable.ReadAt(ctx, trailer, int64(b.Offset+b.Length))
288 1 : algo := block.CompressionIndicator(trailer[0])
289 1 : checksum := binary.LittleEndian.Uint32(trailer[1:])
290 1 : tpNode.Childf("trailer [compression=%s checksum=0x%04x]", algo, checksum)
291 1 : return nil
292 : }()
293 1 : if err != nil {
294 0 : tpNode.Childf("error reading block: %v", err)
295 0 : }
296 : }
297 1 : return tp.String()
298 : }
299 :
300 : type blockFormatting struct {
301 : formatIndexBlock formatBlockFunc
302 : formatDataBlock formatBlockFuncKV
303 : formatKeyspanBlock formatBlockFuncKV
304 : }
305 :
306 : type (
307 : formatBlockFunc func(treeprinter.Node, *Reader, NamedBlockHandle, []byte) error
308 : formatBlockFuncKV func(treeprinter.Node, *Reader, NamedBlockHandle, []byte, func(*base.InternalKey, []byte) string) error
309 : )
310 :
311 : var (
312 : rowblkFormatting = blockFormatting{
313 : formatIndexBlock: formatRowblkIndexBlock,
314 : formatDataBlock: formatRowblkDataBlock,
315 : formatKeyspanBlock: formatRowblkDataBlock,
316 : }
317 : colblkFormatting = blockFormatting{
318 : formatIndexBlock: formatColblkIndexBlock,
319 : formatDataBlock: formatColblkDataBlock,
320 : formatKeyspanBlock: formatColblkKeyspanBlock,
321 : }
322 : )
323 :
324 1 : func formatColblkIndexBlock(tp treeprinter.Node, r *Reader, b NamedBlockHandle, data []byte) error {
325 1 : var iter colblk.IndexIter
326 1 : if err := iter.Init(r.Compare, r.Split, data, NoTransforms); err != nil {
327 0 : return err
328 0 : }
329 1 : defer iter.Close()
330 1 : i := 0
331 1 : for v := iter.First(); v; v = iter.Next() {
332 1 : bh, err := iter.BlockHandleWithProperties()
333 1 : if err != nil {
334 0 : return err
335 0 : }
336 1 : tp.Childf("%05d block:%d/%d\n", i, bh.Offset, bh.Length)
337 1 : i++
338 : }
339 1 : return nil
340 : }
341 :
342 : func formatColblkDataBlock(
343 : tp treeprinter.Node,
344 : r *Reader,
345 : b NamedBlockHandle,
346 : data []byte,
347 : fmtKV func(key *base.InternalKey, value []byte) string,
348 1 : ) error {
349 1 : var decoder colblk.DataBlockDecoder
350 1 : decoder.Init(r.keySchema, data)
351 1 : f := binfmt.New(data)
352 1 : decoder.Describe(f, tp)
353 1 :
354 1 : if fmtKV != nil {
355 1 : var iter colblk.DataBlockIter
356 1 : iter.InitOnce(r.keySchema, r.Compare, r.Split, describingLazyValueHandler{})
357 1 : if err := iter.Init(&decoder, block.IterTransforms{}); err != nil {
358 0 : return err
359 0 : }
360 1 : defer iter.Close()
361 1 : for kv := iter.First(); kv != nil; kv = iter.Next() {
362 1 : tp.Child(fmtKV(&kv.K, kv.V.ValueOrHandle))
363 1 : }
364 : }
365 1 : return nil
366 : }
367 :
368 : // describingLazyValueHandler is a block.GetLazyValueForPrefixAndValueHandler
369 : // that replaces a value handle with an in-place value describing the handle.
370 : type describingLazyValueHandler struct{}
371 :
372 : // Assert that debugLazyValueHandler implements the
373 : // block.GetLazyValueForPrefixAndValueHandler interface.
374 : var _ block.GetLazyValueForPrefixAndValueHandler = describingLazyValueHandler{}
375 :
376 : func (describingLazyValueHandler) GetLazyValueForPrefixAndValueHandle(
377 : handle []byte,
378 1 : ) base.LazyValue {
379 1 : vh := valblk.DecodeHandle(handle[1:])
380 1 : return base.LazyValue{ValueOrHandle: []byte(fmt.Sprintf("value handle %+v", vh))}
381 1 : }
382 :
383 : func formatColblkKeyspanBlock(
384 : tp treeprinter.Node,
385 : r *Reader,
386 : b NamedBlockHandle,
387 : data []byte,
388 : _ func(*base.InternalKey, []byte) string,
389 1 : ) error {
390 1 : var decoder colblk.KeyspanDecoder
391 1 : decoder.Init(data)
392 1 : f := binfmt.New(data)
393 1 : decoder.Describe(f, tp)
394 1 : return nil
395 1 : }
396 :
397 1 : func formatRowblkIndexBlock(tp treeprinter.Node, r *Reader, b NamedBlockHandle, data []byte) error {
398 1 : iter, err := rowblk.NewIter(r.Compare, r.Split, data, NoTransforms)
399 1 : if err != nil {
400 0 : return err
401 0 : }
402 1 : iter.Describe(tp, func(w io.Writer, key *base.InternalKey, value []byte, enc rowblk.KVEncoding) {
403 1 : bh, err := block.DecodeHandleWithProperties(value)
404 1 : if err != nil {
405 0 : fmt.Fprintf(w, "%05d [err: %s]\n", enc.Offset, err)
406 0 : return
407 0 : }
408 1 : fmt.Fprintf(w, "%05d block:%d/%d", enc.Offset, bh.Offset, bh.Length)
409 1 : if enc.IsRestart {
410 1 : fmt.Fprintf(w, " [restart]")
411 1 : }
412 : })
413 1 : return nil
414 : }
415 :
416 : func formatRowblkDataBlock(
417 : tp treeprinter.Node,
418 : r *Reader,
419 : b NamedBlockHandle,
420 : data []byte,
421 : fmtRecord func(key *base.InternalKey, value []byte) string,
422 1 : ) error {
423 1 : iter, err := rowblk.NewIter(r.Compare, r.Split, data, NoTransforms)
424 1 : if err != nil {
425 0 : return err
426 0 : }
427 1 : iter.Describe(tp, func(w io.Writer, key *base.InternalKey, value []byte, enc rowblk.KVEncoding) {
428 1 : // The format of the numbers in the record line is:
429 1 : //
430 1 : // (<total> = <length> [<shared>] + <unshared> + <value>)
431 1 : //
432 1 : // <total> is the total number of bytes for the record.
433 1 : // <length> is the size of the 3 varint encoded integers for <shared>,
434 1 : // <unshared>, and <value>.
435 1 : // <shared> is the number of key bytes shared with the previous key.
436 1 : // <unshared> is the number of unshared key bytes.
437 1 : // <value> is the number of value bytes.
438 1 : fmt.Fprintf(w, "%05d record (%d = %d [%d] + %d + %d)",
439 1 : uint64(enc.Offset), enc.Length,
440 1 : enc.Length-int32(enc.KeyUnshared+enc.ValueLen), enc.KeyShared, enc.KeyUnshared, enc.ValueLen)
441 1 : if enc.IsRestart {
442 1 : fmt.Fprint(w, " [restart]")
443 1 : }
444 1 : if fmtRecord != nil {
445 1 : if r.tableFormat < TableFormatPebblev3 || key.Kind() != InternalKeyKindSet {
446 1 : fmt.Fprintf(w, "\n %s", fmtRecord(key, value))
447 1 : } else if !block.ValuePrefix(value[0]).IsValueHandle() {
448 1 : fmt.Fprintf(w, "\n %s", fmtRecord(key, value[1:]))
449 1 : } else {
450 1 : vh := valblk.DecodeHandle(value[1:])
451 1 : fmt.Fprintf(w, "\n %s", fmtRecord(key, []byte(fmt.Sprintf("value handle %+v", vh))))
452 1 : }
453 : }
454 : })
455 1 : return nil
456 : }
457 :
458 1 : func decodeLayout(comparer *base.Comparer, data []byte) (Layout, error) {
459 1 : foot, err := parseFooter(data, 0, int64(len(data)))
460 1 : if err != nil {
461 0 : return Layout{}, err
462 0 : }
463 1 : decompressedMeta, err := decompressInMemory(data, foot.metaindexBH)
464 1 : if err != nil {
465 0 : return Layout{}, errors.Wrap(err, "decompressing metaindex")
466 0 : }
467 1 : meta, vbih, err := decodeMetaindex(decompressedMeta)
468 1 : if err != nil {
469 0 : return Layout{}, err
470 0 : }
471 1 : layout := Layout{
472 1 : MetaIndex: foot.metaindexBH,
473 1 : Properties: meta[metaPropertiesName],
474 1 : RangeDel: meta[metaRangeDelV2Name],
475 1 : RangeKey: meta[metaRangeKeyName],
476 1 : ValueIndex: vbih.Handle,
477 1 : Footer: foot.footerBH,
478 1 : Format: foot.format,
479 1 : }
480 1 : var props Properties
481 1 : decompressedProps, err := decompressInMemory(data, layout.Properties)
482 1 : if err != nil {
483 0 : return Layout{}, errors.Wrap(err, "decompressing properties")
484 0 : }
485 1 : if err := props.load(decompressedProps, map[string]struct{}{}); err != nil {
486 0 : return Layout{}, err
487 0 : }
488 :
489 1 : if props.IndexType == twoLevelIndex {
490 1 : decompressed, err := decompressInMemory(data, foot.indexBH)
491 1 : if err != nil {
492 0 : return Layout{}, errors.Wrap(err, "decompressing two-level index")
493 0 : }
494 1 : layout.TopIndex = foot.indexBH
495 1 : topLevelIter, err := newIndexIter(foot.format, comparer, decompressed)
496 1 : if err != nil {
497 0 : return Layout{}, err
498 0 : }
499 1 : err = forEachIndexEntry(topLevelIter, func(bhp block.HandleWithProperties) {
500 1 : layout.Index = append(layout.Index, bhp.Handle)
501 1 : })
502 1 : if err != nil {
503 0 : return Layout{}, err
504 0 : }
505 0 : } else {
506 0 : layout.Index = append(layout.Index, foot.indexBH)
507 0 : }
508 1 : for _, indexBH := range layout.Index {
509 1 : decompressed, err := decompressInMemory(data, indexBH)
510 1 : if err != nil {
511 0 : return Layout{}, errors.Wrap(err, "decompressing index block")
512 0 : }
513 1 : indexIter, err := newIndexIter(foot.format, comparer, decompressed)
514 1 : if err != nil {
515 0 : return Layout{}, err
516 0 : }
517 1 : err = forEachIndexEntry(indexIter, func(bhp block.HandleWithProperties) {
518 1 : layout.Data = append(layout.Data, bhp)
519 1 : })
520 1 : if err != nil {
521 0 : return Layout{}, err
522 0 : }
523 : }
524 :
525 1 : if layout.ValueIndex.Length > 0 {
526 0 : vbiBlock, err := decompressInMemory(data, layout.ValueIndex)
527 0 : if err != nil {
528 0 : return Layout{}, errors.Wrap(err, "decompressing value index")
529 0 : }
530 0 : layout.ValueBlock, err = valblk.DecodeIndex(vbiBlock, vbih)
531 0 : if err != nil {
532 0 : return Layout{}, err
533 0 : }
534 : }
535 :
536 1 : return layout, nil
537 : }
538 :
539 1 : func decompressInMemory(data []byte, bh block.Handle) ([]byte, error) {
540 1 : typ := block.CompressionIndicator(data[bh.Offset+bh.Length])
541 1 : var decompressed []byte
542 1 : if typ == block.NoCompressionIndicator {
543 1 : return data[bh.Offset : bh.Offset+bh.Length], nil
544 1 : }
545 : // Decode the length of the decompressed value.
546 1 : decodedLen, prefixLen, err := block.DecompressedLen(typ, data[bh.Offset:bh.Offset+bh.Length])
547 1 : if err != nil {
548 0 : return nil, err
549 0 : }
550 1 : decompressed = make([]byte, decodedLen)
551 1 : if err := block.DecompressInto(typ, data[int(bh.Offset)+prefixLen:bh.Offset+bh.Length], decompressed); err != nil {
552 0 : return nil, err
553 0 : }
554 1 : return decompressed, nil
555 : }
556 :
557 : func newIndexIter(
558 : tableFormat TableFormat, comparer *base.Comparer, data []byte,
559 1 : ) (block.IndexBlockIterator, error) {
560 1 : var iter block.IndexBlockIterator
561 1 : var err error
562 1 : if tableFormat <= TableFormatPebblev4 {
563 1 : iter = new(rowblk.IndexIter)
564 1 : err = iter.Init(comparer.Compare, comparer.Split, data, block.NoTransforms)
565 1 : } else {
566 1 : iter = new(colblk.IndexIter)
567 1 : err = iter.Init(comparer.Compare, comparer.Split, data, block.NoTransforms)
568 1 : }
569 1 : if err != nil {
570 0 : return nil, err
571 0 : }
572 1 : return iter, nil
573 : }
574 :
575 : func forEachIndexEntry(
576 : indexIter block.IndexBlockIterator, fn func(block.HandleWithProperties),
577 1 : ) error {
578 1 : for v := indexIter.First(); v; v = indexIter.Next() {
579 1 : bhp, err := indexIter.BlockHandleWithProperties()
580 1 : if err != nil {
581 0 : return err
582 0 : }
583 1 : fn(bhp)
584 : }
585 1 : return indexIter.Close()
586 : }
587 :
588 : func decodeMetaindex(
589 : data []byte,
590 1 : ) (meta map[string]block.Handle, vbih valblk.IndexHandle, err error) {
591 1 : i, err := rowblk.NewRawIter(bytes.Compare, data)
592 1 : if err != nil {
593 0 : return nil, valblk.IndexHandle{}, err
594 0 : }
595 1 : defer func() { err = firstError(err, i.Close()) }()
596 :
597 1 : meta = map[string]block.Handle{}
598 1 : for valid := i.First(); valid; valid = i.Next() {
599 1 : value := i.Value()
600 1 : if bytes.Equal(i.Key().UserKey, []byte(metaValueIndexName)) {
601 1 : var n int
602 1 : vbih, n, err = valblk.DecodeIndexHandle(i.Value())
603 1 : if err != nil {
604 0 : return nil, vbih, err
605 0 : }
606 1 : if n == 0 || n != len(value) {
607 0 : return nil, vbih, base.CorruptionErrorf("pebble/table: invalid table (bad value blocks index handle)")
608 0 : }
609 1 : } else {
610 1 : bh, n := block.DecodeHandle(value)
611 1 : if n == 0 || n != len(value) {
612 0 : return nil, vbih, base.CorruptionErrorf("pebble/table: invalid table (bad block handle)")
613 0 : }
614 1 : meta[string(i.Key().UserKey)] = bh
615 : }
616 : }
617 1 : return meta, vbih, nil
618 : }
619 :
620 : // layoutWriter writes the structure of an sstable to durable storage. It
621 : // accepts serialized blocks, writes them to storage and returns a block handle
622 : // describing the offset and length of the block.
623 : type layoutWriter struct {
624 : writable objstorage.Writable
625 :
626 : // cacheOpts are used to remove blocks written to the sstable from the cache,
627 : // providing a defense in depth against bugs which cause cache collisions.
628 : cacheOpts sstableinternal.CacheOptions
629 :
630 : // options copied from WriterOptions
631 : tableFormat TableFormat
632 : compression block.Compression
633 : checksumType block.ChecksumType
634 :
635 : // offset tracks the current write offset within the writable.
636 : offset uint64
637 : // lastIndexBlockHandle holds the handle to the most recently-written index
638 : // block. It's updated by writeIndexBlock. When writing sstables with a
639 : // single-level index, this field will be updated once. When writing
640 : // sstables with a two-level index, the last update will set the two-level
641 : // index.
642 : lastIndexBlockHandle block.Handle
643 : handles []metaIndexHandle
644 : handlesBuf bytealloc.A
645 : tmp [blockHandleLikelyMaxLen]byte
646 : buf blockBuf
647 : }
648 :
649 1 : func makeLayoutWriter(w objstorage.Writable, opts WriterOptions) layoutWriter {
650 1 : return layoutWriter{
651 1 : writable: w,
652 1 : cacheOpts: opts.internal.CacheOpts,
653 1 : tableFormat: opts.TableFormat,
654 1 : compression: opts.Compression,
655 1 : checksumType: opts.Checksum,
656 1 : buf: blockBuf{
657 1 : checksummer: block.Checksummer{Type: opts.Checksum},
658 1 : },
659 1 : }
660 1 : }
661 :
662 : type metaIndexHandle struct {
663 : key string
664 : encodedBlockHandle []byte
665 : }
666 :
667 : // Abort aborts writing the table, aborting the underlying writable too. Abort
668 : // is idempotent.
669 1 : func (w *layoutWriter) Abort() {
670 1 : if w.writable != nil {
671 1 : w.writable.Abort()
672 1 : w.writable = nil
673 1 : }
674 : }
675 :
676 : // WriteDataBlock constructs a trailer for the provided data block and writes
677 : // the block and trailer to the writer. It returns the block's handle.
678 1 : func (w *layoutWriter) WriteDataBlock(b []byte, buf *blockBuf) (block.Handle, error) {
679 1 : return w.writeBlock(b, w.compression, buf)
680 1 : }
681 :
682 : // WritePrecompressedDataBlock writes a pre-compressed data block and its
683 : // pre-computed trailer to the writer, returning it's block handle.
684 1 : func (w *layoutWriter) WritePrecompressedDataBlock(blk block.PhysicalBlock) (block.Handle, error) {
685 1 : return w.writePrecompressedBlock(blk)
686 1 : }
687 :
688 : // WriteIndexBlock constructs a trailer for the provided index (first or
689 : // second-level) and writes the block and trailer to the writer. It remembers
690 : // the last-written index block's handle and adds it to the file's meta index
691 : // when the writer is finished.
692 1 : func (w *layoutWriter) WriteIndexBlock(b []byte) (block.Handle, error) {
693 1 : h, err := w.writeBlock(b, w.compression, &w.buf)
694 1 : if err == nil {
695 1 : w.lastIndexBlockHandle = h
696 1 : }
697 1 : return h, err
698 : }
699 :
700 : // WriteFilterBlock finishes the provided filter, constructs a trailer and
701 : // writes the block and trailer to the writer. It automatically adds the filter
702 : // block to the file's meta index when the writer is finished.
703 1 : func (w *layoutWriter) WriteFilterBlock(f filterWriter) (bh block.Handle, err error) {
704 1 : b, err := f.finish()
705 1 : if err != nil {
706 0 : return block.Handle{}, err
707 0 : }
708 1 : return w.writeNamedBlock(b, f.metaName())
709 : }
710 :
711 : // WritePropertiesBlock constructs a trailer for the provided properties block
712 : // and writes the block and trailer to the writer. It automatically adds the
713 : // properties block to the file's meta index when the writer is finished.
714 1 : func (w *layoutWriter) WritePropertiesBlock(b []byte) (block.Handle, error) {
715 1 : return w.writeNamedBlock(b, metaPropertiesName)
716 1 : }
717 :
718 : // WriteRangeKeyBlock constructs a trailer for the provided range key block and
719 : // writes the block and trailer to the writer. It automatically adds the range
720 : // key block to the file's meta index when the writer is finished.
721 1 : func (w *layoutWriter) WriteRangeKeyBlock(b []byte) (block.Handle, error) {
722 1 : return w.writeNamedBlock(b, metaRangeKeyName)
723 1 : }
724 :
725 : // WriteRangeDeletionBlock constructs a trailer for the provided range deletion
726 : // block and writes the block and trailer to the writer. It automatically adds
727 : // the range deletion block to the file's meta index when the writer is
728 : // finished.
729 1 : func (w *layoutWriter) WriteRangeDeletionBlock(b []byte) (block.Handle, error) {
730 1 : return w.writeNamedBlock(b, metaRangeDelV2Name)
731 1 : }
732 :
733 1 : func (w *layoutWriter) writeNamedBlock(b []byte, name string) (bh block.Handle, err error) {
734 1 : bh, err = w.writeBlock(b, block.NoCompression, &w.buf)
735 1 : if err == nil {
736 1 : w.recordToMetaindex(name, bh)
737 1 : }
738 1 : return bh, err
739 : }
740 :
741 : // WriteValueBlock writes a pre-finished value block (with the trailer) to the
742 : // writer.
743 1 : func (w *layoutWriter) WriteValueBlock(blk block.PhysicalBlock) (block.Handle, error) {
744 1 : return w.writePrecompressedBlock(blk)
745 1 : }
746 :
747 : func (w *layoutWriter) WriteValueIndexBlock(
748 : blk []byte, vbih valblk.IndexHandle,
749 1 : ) (block.Handle, error) {
750 1 : // NB: value index blocks are already finished and contain the block
751 1 : // trailer.
752 1 : // TODO(jackson): can this be refactored to make value blocks less
753 1 : // of a snowflake?
754 1 : off := w.offset
755 1 : w.clearFromCache(off)
756 1 : // Write the bytes to the file.
757 1 : if err := w.writable.Write(blk); err != nil {
758 0 : return block.Handle{}, err
759 0 : }
760 1 : l := uint64(len(blk))
761 1 : w.offset += l
762 1 :
763 1 : n := valblk.EncodeIndexHandle(w.tmp[:], vbih)
764 1 : w.recordToMetaindexRaw(metaValueIndexName, w.tmp[:n])
765 1 :
766 1 : return block.Handle{Offset: off, Length: l}, nil
767 : }
768 :
769 : func (w *layoutWriter) writeBlock(
770 : b []byte, compression block.Compression, buf *blockBuf,
771 1 : ) (block.Handle, error) {
772 1 : return w.writePrecompressedBlock(block.CompressAndChecksum(
773 1 : &buf.compressedBuf, b, compression, &buf.checksummer))
774 1 : }
775 :
776 : // writePrecompressedBlock writes a pre-compressed block and its
777 : // pre-computed trailer to the writer, returning it's block handle.
778 1 : func (w *layoutWriter) writePrecompressedBlock(blk block.PhysicalBlock) (block.Handle, error) {
779 1 : w.clearFromCache(w.offset)
780 1 : // Write the bytes to the file.
781 1 : n, err := blk.WriteTo(w.writable)
782 1 : if err != nil {
783 0 : return block.Handle{}, err
784 0 : }
785 1 : bh := block.Handle{Offset: w.offset, Length: uint64(blk.LengthWithoutTrailer())}
786 1 : w.offset += uint64(n)
787 1 : return bh, nil
788 : }
789 :
790 : // Write implements io.Writer. This is analogous to writePrecompressedBlock for
791 : // blocks that already incorporate the trailer, and don't need the callee to
792 : // return a BlockHandle.
793 0 : func (w *layoutWriter) Write(blockWithTrailer []byte) (n int, err error) {
794 0 : offset := w.offset
795 0 : w.clearFromCache(offset)
796 0 : w.offset += uint64(len(blockWithTrailer))
797 0 : if err := w.writable.Write(blockWithTrailer); err != nil {
798 0 : return 0, err
799 0 : }
800 0 : return len(blockWithTrailer), nil
801 : }
802 :
803 : // clearFromCache removes the block at the provided offset from the cache. This provides defense in
804 : // depth against bugs which cause cache collisions.
805 1 : func (w *layoutWriter) clearFromCache(offset uint64) {
806 1 : if w.cacheOpts.Cache != nil {
807 1 : // TODO(peter): Alternatively, we could add the uncompressed value to the
808 1 : // cache.
809 1 : w.cacheOpts.Cache.Delete(w.cacheOpts.CacheID, w.cacheOpts.FileNum, offset)
810 1 : }
811 : }
812 :
813 1 : func (w *layoutWriter) recordToMetaindex(key string, h block.Handle) {
814 1 : n := h.EncodeVarints(w.tmp[:])
815 1 : w.recordToMetaindexRaw(key, w.tmp[:n])
816 1 : }
817 :
818 1 : func (w *layoutWriter) recordToMetaindexRaw(key string, h []byte) {
819 1 : var encodedHandle []byte
820 1 : w.handlesBuf, encodedHandle = w.handlesBuf.Alloc(len(h))
821 1 : copy(encodedHandle, h)
822 1 : w.handles = append(w.handles, metaIndexHandle{key: key, encodedBlockHandle: encodedHandle})
823 1 : }
824 :
825 1 : func (w *layoutWriter) IsFinished() bool { return w.writable == nil }
826 :
827 : // Finish serializes the sstable, writing out the meta index block and sstable
828 : // footer and closing the file. It returns the total size of the resulting
829 : // ssatable.
830 1 : func (w *layoutWriter) Finish() (size uint64, err error) {
831 1 : // Sort the meta index handles by key and write the meta index block.
832 1 : slices.SortFunc(w.handles, func(a, b metaIndexHandle) int {
833 1 : return cmp.Compare(a.key, b.key)
834 1 : })
835 1 : bw := rowblk.Writer{RestartInterval: 1}
836 1 : for _, h := range w.handles {
837 1 : bw.AddRaw(unsafe.Slice(unsafe.StringData(h.key), len(h.key)), h.encodedBlockHandle)
838 1 : }
839 1 : metaIndexHandle, err := w.writeBlock(bw.Finish(), block.NoCompression, &w.buf)
840 1 : if err != nil {
841 0 : return 0, err
842 0 : }
843 :
844 : // Write the table footer.
845 1 : footer := footer{
846 1 : format: w.tableFormat,
847 1 : checksum: w.checksumType,
848 1 : metaindexBH: metaIndexHandle,
849 1 : indexBH: w.lastIndexBlockHandle,
850 1 : }
851 1 : encodedFooter := footer.encode(w.tmp[:])
852 1 : if err := w.writable.Write(encodedFooter); err != nil {
853 0 : return 0, err
854 0 : }
855 1 : w.offset += uint64(len(encodedFooter))
856 1 :
857 1 : err = w.writable.Finish()
858 1 : w.writable = nil
859 1 : return w.offset, err
860 : }
|