Line data Source code
1 : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package block
6 :
7 : import (
8 : "encoding/binary"
9 :
10 : "github.com/cockroachdb/errors"
11 : "github.com/cockroachdb/pebble/internal/base"
12 : "github.com/cockroachdb/pebble/internal/bytealloc"
13 : "github.com/cockroachdb/pebble/objstorage"
14 : "github.com/golang/snappy"
15 : )
16 :
17 : // Compression is the per-block compression algorithm to use.
18 : type Compression int
19 :
20 : // The available compression types.
21 : const (
22 : DefaultCompression Compression = iota
23 : NoCompression
24 : SnappyCompression
25 : ZstdCompression
26 : NCompression
27 : )
28 :
29 : // String implements fmt.Stringer, returning a human-readable name for the
30 : // compression algorithm.
31 2 : func (c Compression) String() string {
32 2 : switch c {
33 0 : case DefaultCompression:
34 0 : return "Default"
35 2 : case NoCompression:
36 2 : return "NoCompression"
37 2 : case SnappyCompression:
38 2 : return "Snappy"
39 2 : case ZstdCompression:
40 2 : return "ZSTD"
41 0 : default:
42 0 : return "Unknown"
43 : }
44 : }
45 :
46 : // CompressionFromString returns an sstable.Compression from its
47 : // string representation. Inverse of c.String() above.
48 2 : func CompressionFromString(s string) Compression {
49 2 : switch s {
50 0 : case "Default":
51 0 : return DefaultCompression
52 2 : case "NoCompression":
53 2 : return NoCompression
54 2 : case "Snappy":
55 2 : return SnappyCompression
56 1 : case "ZSTD":
57 1 : return ZstdCompression
58 2 : default:
59 2 : return DefaultCompression
60 : }
61 : }
62 :
63 : // CompressionIndicator is the byte stored physically within the block.Trailer
64 : // to indicate the compression type.
65 : //
66 : // TODO(jackson): Avoid exporting once all compression and decompression is
67 : // delegated to the block package.
68 : type CompressionIndicator byte
69 :
70 : // The block type gives the per-block compression format.
71 : // These constants are part of the file format and should not be changed.
72 : // They are different from the Compression constants because the latter
73 : // are designed so that the zero value of the Compression type means to
74 : // use the default compression (which is snappy).
75 : // Not all compression types listed here are supported.
76 : const (
77 : NoCompressionIndicator CompressionIndicator = 0
78 : SnappyCompressionIndicator CompressionIndicator = 1
79 : ZlibCompressionIndicator CompressionIndicator = 2
80 : Bzip2CompressionIndicator CompressionIndicator = 3
81 : Lz4CompressionIndicator CompressionIndicator = 4
82 : Lz4hcCompressionIndicator CompressionIndicator = 5
83 : XpressCompressionIndicator CompressionIndicator = 6
84 : ZstdCompressionIndicator CompressionIndicator = 7
85 : )
86 :
87 : // String implements fmt.Stringer.
88 1 : func (i CompressionIndicator) String() string {
89 1 : switch i {
90 1 : case 0:
91 1 : return "none"
92 1 : case 1:
93 1 : return "snappy"
94 0 : case 2:
95 0 : return "zlib"
96 0 : case 3:
97 0 : return "bzip2"
98 0 : case 4:
99 0 : return "lz4"
100 0 : case 5:
101 0 : return "lz4hc"
102 0 : case 6:
103 0 : return "xpress"
104 0 : case 7:
105 0 : return "zstd"
106 0 : default:
107 0 : panic(errors.Newf("sstable: unknown block type: %d", i))
108 : }
109 : }
110 :
111 : // DecompressedLen returns the length of the provided block once decompressed,
112 : // allowing the caller to allocate a buffer exactly sized to the decompressed
113 : // payload. For some compression algorithms, the payload is prefixed with a
114 : // varint encoding the length of the decompressed block. In such cases, a
115 : // non-zero prefixLength is returned indicating the length of this prefix.
116 : func DecompressedLen(
117 : algo CompressionIndicator, b []byte,
118 2 : ) (decompressedLen int, prefixLength int, err error) {
119 2 : switch algo {
120 0 : case NoCompressionIndicator:
121 0 : return 0, 0, nil
122 2 : case SnappyCompressionIndicator:
123 2 : l, err := snappy.DecodedLen(b)
124 2 : return l, 0, err
125 2 : case ZstdCompressionIndicator:
126 2 : // This will also be used by zlib, bzip2 and lz4 to retrieve the decodedLen
127 2 : // if we implement these algorithms in the future.
128 2 : decodedLenU64, varIntLen := binary.Uvarint(b)
129 2 : if varIntLen <= 0 {
130 0 : return 0, 0, base.CorruptionErrorf("pebble/table: compression block has invalid length")
131 0 : }
132 2 : return int(decodedLenU64), varIntLen, nil
133 0 : default:
134 0 : return 0, 0, base.CorruptionErrorf("pebble/table: unknown block compression: %d", errors.Safe(algo))
135 : }
136 : }
137 :
138 : // DecompressInto decompresses compressed into buf. The buf slice must have the
139 : // exact size as the decompressed value. Callers may use DecompressedLen to
140 : // determine the correct size.
141 2 : func DecompressInto(algo CompressionIndicator, compressed []byte, buf []byte) error {
142 2 : var result []byte
143 2 : var err error
144 2 : switch algo {
145 2 : case SnappyCompressionIndicator:
146 2 : result, err = snappy.Decode(buf, compressed)
147 2 : case ZstdCompressionIndicator:
148 2 : result, err = decodeZstd(buf, compressed)
149 0 : default:
150 0 : return base.CorruptionErrorf("pebble/table: unknown block compression: %d", errors.Safe(algo))
151 : }
152 2 : if err != nil {
153 1 : return base.MarkCorruptionError(err)
154 1 : }
155 2 : if len(result) != len(buf) || (len(result) > 0 && &result[0] != &buf[0]) {
156 0 : return base.CorruptionErrorf("pebble/table: decompressed into unexpected buffer: %p != %p",
157 0 : errors.Safe(result), errors.Safe(buf))
158 0 : }
159 2 : return nil
160 : }
161 :
162 : // PhysicalBlock represents a block (possibly compressed) as it is stored
163 : // physically on disk, including its trailer.
164 : type PhysicalBlock struct {
165 : // data contains the possibly compressed block data.
166 : data []byte
167 : trailer Trailer
168 : }
169 :
170 : // LengthWithTrailer returns the length of the data block, including the trailer.
171 2 : func (b *PhysicalBlock) LengthWithTrailer() int {
172 2 : return len(b.data) + TrailerLen
173 2 : }
174 :
175 : // LengthWithoutTrailer returns the length of the data block, excluding the trailer.
176 2 : func (b *PhysicalBlock) LengthWithoutTrailer() int {
177 2 : return len(b.data)
178 2 : }
179 :
180 : // CloneWithByteAlloc returns a deep copy of the block, using the provided
181 : // bytealloc.A to allocate memory for the new copy.
182 1 : func (b *PhysicalBlock) CloneWithByteAlloc(a *bytealloc.A) PhysicalBlock {
183 1 : var data []byte
184 1 : *a, data = (*a).Alloc(len(b.data))
185 1 : copy(data, b.data)
186 1 : return PhysicalBlock{
187 1 : data: data,
188 1 : trailer: b.trailer,
189 1 : }
190 1 : }
191 :
192 : // IsCompressed returns true if the block is compressed.
193 2 : func (b *PhysicalBlock) IsCompressed() bool {
194 2 : return CompressionIndicator(b.trailer[0]) != NoCompressionIndicator
195 2 : }
196 :
197 : // WriteTo writes the block (including its trailer) to the provided Writable. If
198 : // err == nil, n is the number of bytes successfully written to the Writable.
199 2 : func (b *PhysicalBlock) WriteTo(w objstorage.Writable) (n int, err error) {
200 2 : if err := w.Write(b.data); err != nil {
201 0 : return 0, err
202 0 : }
203 2 : if err := w.Write(b.trailer[:]); err != nil {
204 0 : return 0, err
205 0 : }
206 2 : return len(b.data) + len(b.trailer), nil
207 : }
208 :
209 : // CompressAndChecksum compresses and checksums the provided block, returning
210 : // the compressed block and its trailer. The dst argument is used for the
211 : // compressed payload if it's sufficiently large. If it's not, a new buffer is
212 : // allocated and *dst is updated to point to it.
213 : //
214 : // If the compressed block is not sufficiently smaller than the original block,
215 : // the compressed payload is discarded and the original, uncompressed block is
216 : // used to avoid unnecessary decompression overhead at read time.
217 : func CompressAndChecksum(
218 : dst *[]byte, block []byte, compression Compression, checksummer *Checksummer,
219 2 : ) PhysicalBlock {
220 2 : // Compress the buffer, discarding the result if the improvement isn't at
221 2 : // least 12.5%.
222 2 : algo := NoCompressionIndicator
223 2 : if compression != NoCompression {
224 2 : var compressed []byte
225 2 : algo, compressed = compress(compression, block, *dst)
226 2 : if algo != NoCompressionIndicator && cap(compressed) > cap(*dst) {
227 2 : *dst = compressed[:cap(compressed)]
228 2 : }
229 2 : if len(compressed) < len(block)-len(block)/8 {
230 2 : block = compressed
231 2 : } else {
232 2 : algo = NoCompressionIndicator
233 2 : }
234 : }
235 :
236 : // Calculate the checksum.
237 2 : pb := PhysicalBlock{data: block}
238 2 : pb.trailer[0] = byte(algo)
239 2 : checksum := checksummer.Checksum(block, pb.trailer[:1])
240 2 : pb.trailer = MakeTrailer(byte(algo), checksum)
241 2 : return pb
242 : }
243 :
244 : // compress compresses a sstable block, using dstBuf as the desired destination.
245 : func compress(
246 : compression Compression, b []byte, dstBuf []byte,
247 2 : ) (indicator CompressionIndicator, compressed []byte) {
248 2 : switch compression {
249 2 : case SnappyCompression:
250 2 : return SnappyCompressionIndicator, snappy.Encode(dstBuf, b)
251 1 : case NoCompression:
252 1 : return NoCompressionIndicator, b
253 2 : case ZstdCompression:
254 2 : if len(dstBuf) < binary.MaxVarintLen64 {
255 2 : dstBuf = append(dstBuf, make([]byte, binary.MaxVarintLen64-len(dstBuf))...)
256 2 : }
257 2 : varIntLen := binary.PutUvarint(dstBuf, uint64(len(b)))
258 2 : return ZstdCompressionIndicator, encodeZstd(dstBuf, varIntLen, b)
259 0 : default:
260 0 : panic("unreachable")
261 : }
262 : }
|