Line data Source code
1 : // Copyright 2021 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package sstable
6 :
7 : import (
8 : "encoding/binary"
9 :
10 : "github.com/cockroachdb/errors"
11 : "github.com/cockroachdb/pebble/internal/base"
12 : "github.com/cockroachdb/pebble/internal/cache"
13 : "github.com/golang/snappy"
14 : )
15 :
16 1 : func decompressedLen(blockType blockType, b []byte) (int, int, error) {
17 1 : switch blockType {
18 0 : case noCompressionBlockType:
19 0 : return 0, 0, nil
20 1 : case snappyCompressionBlockType:
21 1 : l, err := snappy.DecodedLen(b)
22 1 : return l, 0, err
23 1 : case zstdCompressionBlockType:
24 1 : // This will also be used by zlib, bzip2 and lz4 to retrieve the decodedLen
25 1 : // if we implement these algorithms in the future.
26 1 : decodedLenU64, varIntLen := binary.Uvarint(b)
27 1 : if varIntLen <= 0 {
28 0 : return 0, 0, base.CorruptionErrorf("pebble/table: compression block has invalid length")
29 0 : }
30 1 : return int(decodedLenU64), varIntLen, nil
31 0 : default:
32 0 : return 0, 0, base.CorruptionErrorf("pebble/table: unknown block compression: %d", errors.Safe(blockType))
33 : }
34 : }
35 :
36 1 : func decompressInto(blockType blockType, compressed []byte, buf []byte) ([]byte, error) {
37 1 : var result []byte
38 1 : var err error
39 1 : switch blockType {
40 1 : case snappyCompressionBlockType:
41 1 : result, err = snappy.Decode(buf, compressed)
42 1 : case zstdCompressionBlockType:
43 1 : result, err = decodeZstd(buf, compressed)
44 : }
45 1 : if err != nil {
46 0 : return nil, base.MarkCorruptionError(err)
47 0 : }
48 1 : if len(result) != 0 && (len(result) != len(buf) || &result[0] != &buf[0]) {
49 0 : return nil, base.CorruptionErrorf("pebble/table: decompressed into unexpected buffer: %p != %p",
50 0 : errors.Safe(result), errors.Safe(buf))
51 0 : }
52 1 : return result, nil
53 : }
54 :
55 : // decompressBlock decompresses an SST block, with manually-allocated space.
56 : // NB: If decompressBlock returns (nil, nil), no decompression was necessary and
57 : // the caller may use `b` directly.
58 0 : func decompressBlock(blockType blockType, b []byte) (*cache.Value, error) {
59 0 : if blockType == noCompressionBlockType {
60 0 : return nil, nil
61 0 : }
62 : // first obtain the decoded length.
63 0 : decodedLen, prefixLen, err := decompressedLen(blockType, b)
64 0 : if err != nil {
65 0 : return nil, err
66 0 : }
67 0 : b = b[prefixLen:]
68 0 : // Allocate sufficient space from the cache.
69 0 : decoded := cache.Alloc(decodedLen)
70 0 : decodedBuf := decoded.Buf()
71 0 : if _, err := decompressInto(blockType, b, decodedBuf); err != nil {
72 0 : cache.Free(decoded)
73 0 : return nil, err
74 0 : }
75 0 : return decoded, nil
76 : }
77 :
78 : // compressBlock compresses an SST block, using compressBuf as the desired destination.
79 : func compressBlock(
80 : compression Compression, b []byte, compressedBuf []byte,
81 1 : ) (blockType blockType, compressed []byte) {
82 1 : switch compression {
83 1 : case SnappyCompression:
84 1 : return snappyCompressionBlockType, snappy.Encode(compressedBuf, b)
85 1 : case NoCompression:
86 1 : return noCompressionBlockType, b
87 : }
88 :
89 1 : if len(compressedBuf) < binary.MaxVarintLen64 {
90 1 : compressedBuf = append(compressedBuf, make([]byte, binary.MaxVarintLen64-len(compressedBuf))...)
91 1 : }
92 1 : varIntLen := binary.PutUvarint(compressedBuf, uint64(len(b)))
93 1 : switch compression {
94 1 : case ZstdCompression:
95 1 : return zstdCompressionBlockType, encodeZstd(compressedBuf, varIntLen, b)
96 0 : default:
97 0 : return noCompressionBlockType, b
98 : }
99 : }
|