Line data Source code
1 : // Copyright 2021 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package sstable
6 :
7 : import (
8 : "encoding/binary"
9 :
10 : "github.com/cockroachdb/errors"
11 : "github.com/cockroachdb/pebble/internal/base"
12 : "github.com/cockroachdb/pebble/internal/cache"
13 : "github.com/golang/snappy"
14 : )
15 :
16 2 : func decompressedLen(blockType blockType, b []byte) (int, int, error) {
17 2 : switch blockType {
18 0 : case noCompressionBlockType:
19 0 : return 0, 0, nil
20 2 : case snappyCompressionBlockType:
21 2 : l, err := snappy.DecodedLen(b)
22 2 : return l, 0, err
23 2 : case zstdCompressionBlockType:
24 2 : // This will also be used by zlib, bzip2 and lz4 to retrieve the decodedLen
25 2 : // if we implement these algorithms in the future.
26 2 : decodedLenU64, varIntLen := binary.Uvarint(b)
27 2 : if varIntLen <= 0 {
28 0 : return 0, 0, base.CorruptionErrorf("pebble/table: compression block has invalid length")
29 0 : }
30 2 : return int(decodedLenU64), varIntLen, nil
31 0 : default:
32 0 : return 0, 0, base.CorruptionErrorf("pebble/table: unknown block compression: %d", errors.Safe(blockType))
33 : }
34 : }
35 :
36 2 : func decompressInto(blockType blockType, compressed []byte, buf []byte) ([]byte, error) {
37 2 : var result []byte
38 2 : var err error
39 2 : switch blockType {
40 2 : case snappyCompressionBlockType:
41 2 : result, err = snappy.Decode(buf, compressed)
42 2 : case zstdCompressionBlockType:
43 2 : result, err = decodeZstd(buf, compressed)
44 : }
45 2 : if err != nil {
46 1 : return nil, base.MarkCorruptionError(err)
47 1 : }
48 2 : if len(result) != 0 && (len(result) != len(buf) || &result[0] != &buf[0]) {
49 0 : return nil, base.CorruptionErrorf("pebble/table: decompressed into unexpected buffer: %p != %p",
50 0 : errors.Safe(result), errors.Safe(buf))
51 0 : }
52 2 : return result, nil
53 : }
54 :
55 : // decompressBlock decompresses an SST block, with manually-allocated space.
56 : // NB: If decompressBlock returns (nil, nil), no decompression was necessary and
57 : // the caller may use `b` directly.
58 1 : func decompressBlock(blockType blockType, b []byte) (*cache.Value, error) {
59 1 : if blockType == noCompressionBlockType {
60 1 : return nil, nil
61 1 : }
62 : // first obtain the decoded length.
63 1 : decodedLen, prefixLen, err := decompressedLen(blockType, b)
64 1 : if err != nil {
65 0 : return nil, err
66 0 : }
67 1 : b = b[prefixLen:]
68 1 : // Allocate sufficient space from the cache.
69 1 : decoded := cache.Alloc(decodedLen)
70 1 : decodedBuf := decoded.Buf()
71 1 : if _, err := decompressInto(blockType, b, decodedBuf); err != nil {
72 1 : cache.Free(decoded)
73 1 : return nil, err
74 1 : }
75 1 : return decoded, nil
76 : }
77 :
78 : // compressBlock compresses an SST block, using compressBuf as the desired destination.
79 : func compressBlock(
80 : compression Compression, b []byte, compressedBuf []byte,
81 2 : ) (blockType blockType, compressed []byte) {
82 2 : switch compression {
83 2 : case SnappyCompression:
84 2 : return snappyCompressionBlockType, snappy.Encode(compressedBuf, b)
85 2 : case NoCompression:
86 2 : return noCompressionBlockType, b
87 : }
88 :
89 2 : if len(compressedBuf) < binary.MaxVarintLen64 {
90 2 : compressedBuf = append(compressedBuf, make([]byte, binary.MaxVarintLen64-len(compressedBuf))...)
91 2 : }
92 2 : varIntLen := binary.PutUvarint(compressedBuf, uint64(len(b)))
93 2 : switch compression {
94 2 : case ZstdCompression:
95 2 : return zstdCompressionBlockType, encodeZstd(compressedBuf, varIntLen, b)
96 0 : default:
97 0 : return noCompressionBlockType, b
98 : }
99 : }
|