Line data Source code
1 : // Copyright 2021 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package sstable
6 :
7 : import (
8 : "encoding/binary"
9 :
10 : "github.com/cockroachdb/errors"
11 : "github.com/cockroachdb/pebble/internal/base"
12 : "github.com/cockroachdb/pebble/internal/cache"
13 : "github.com/golang/snappy"
14 : )
15 :
16 1 : func decompressedLen(blockType blockType, b []byte) (int, int, error) {
17 1 : switch blockType {
18 0 : case noCompressionBlockType:
19 0 : return 0, 0, nil
20 1 : case snappyCompressionBlockType:
21 1 : l, err := snappy.DecodedLen(b)
22 1 : return l, 0, err
23 1 : case zstdCompressionBlockType:
24 1 : // This will also be used by zlib, bzip2 and lz4 to retrieve the decodedLen
25 1 : // if we implement these algorithms in the future.
26 1 : decodedLenU64, varIntLen := binary.Uvarint(b)
27 1 : if varIntLen <= 0 {
28 0 : return 0, 0, base.CorruptionErrorf("pebble/table: compression block has invalid length")
29 0 : }
30 1 : return int(decodedLenU64), varIntLen, nil
31 0 : default:
32 0 : return 0, 0, base.CorruptionErrorf("pebble/table: unknown block compression: %d", errors.Safe(blockType))
33 : }
34 : }
35 :
36 : // decompressInto decompresses compressed into buf. The buf slice must have the
37 : // exact size as the decompressed value.
38 1 : func decompressInto(blockType blockType, compressed []byte, buf []byte) error {
39 1 : var result []byte
40 1 : var err error
41 1 : switch blockType {
42 1 : case snappyCompressionBlockType:
43 1 : result, err = snappy.Decode(buf, compressed)
44 1 : case zstdCompressionBlockType:
45 1 : result, err = decodeZstd(buf, compressed)
46 0 : default:
47 0 : return base.CorruptionErrorf("pebble/table: unknown block compression: %d", errors.Safe(blockType))
48 : }
49 1 : if err != nil {
50 0 : return base.MarkCorruptionError(err)
51 0 : }
52 1 : if len(result) != len(buf) || (len(result) > 0 && &result[0] != &buf[0]) {
53 0 : return base.CorruptionErrorf("pebble/table: decompressed into unexpected buffer: %p != %p",
54 0 : errors.Safe(result), errors.Safe(buf))
55 0 : }
56 1 : return nil
57 : }
58 :
59 : // decompressBlock decompresses an SST block, with manually-allocated space.
60 : // NB: If decompressBlock returns (nil, nil), no decompression was necessary and
61 : // the caller may use `b` directly.
62 0 : func decompressBlock(blockType blockType, b []byte) (*cache.Value, error) {
63 0 : if blockType == noCompressionBlockType {
64 0 : return nil, nil
65 0 : }
66 : // first obtain the decoded length.
67 0 : decodedLen, prefixLen, err := decompressedLen(blockType, b)
68 0 : if err != nil {
69 0 : return nil, err
70 0 : }
71 0 : b = b[prefixLen:]
72 0 : // Allocate sufficient space from the cache.
73 0 : decoded := cache.Alloc(decodedLen)
74 0 : decodedBuf := decoded.Buf()
75 0 : if err := decompressInto(blockType, b, decodedBuf); err != nil {
76 0 : cache.Free(decoded)
77 0 : return nil, err
78 0 : }
79 0 : return decoded, nil
80 : }
81 :
82 : // compressBlock compresses an SST block, using compressBuf as the desired destination.
83 : func compressBlock(
84 : compression Compression, b []byte, compressedBuf []byte,
85 1 : ) (blockType blockType, compressed []byte) {
86 1 : switch compression {
87 1 : case SnappyCompression:
88 1 : return snappyCompressionBlockType, snappy.Encode(compressedBuf, b)
89 1 : case NoCompression:
90 1 : return noCompressionBlockType, b
91 : }
92 :
93 1 : if len(compressedBuf) < binary.MaxVarintLen64 {
94 1 : compressedBuf = append(compressedBuf, make([]byte, binary.MaxVarintLen64-len(compressedBuf))...)
95 1 : }
96 1 : varIntLen := binary.PutUvarint(compressedBuf, uint64(len(b)))
97 1 : switch compression {
98 1 : case ZstdCompression:
99 1 : return zstdCompressionBlockType, encodeZstd(compressedBuf, varIntLen, b)
100 0 : default:
101 0 : return noCompressionBlockType, b
102 : }
103 : }
|