LCOV - code coverage report
Current view: top level - pebble/sstable/block - compression.go (source / functions) Hit Total Coverage
Test: 2024-08-13 08:17Z 57ff76d1 - tests + meta.lcov Lines: 102 139 73.4 %
Date: 2024-08-13 08:18:14 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package block
       6             : 
       7             : import (
       8             :         "encoding/binary"
       9             : 
      10             :         "github.com/cockroachdb/errors"
      11             :         "github.com/cockroachdb/pebble/internal/base"
      12             :         "github.com/cockroachdb/pebble/internal/bytealloc"
      13             :         "github.com/cockroachdb/pebble/objstorage"
      14             :         "github.com/golang/snappy"
      15             : )
      16             : 
      17             : // Compression is the per-block compression algorithm to use.
      18             : type Compression int
      19             : 
      20             : // The available compression types.
      21             : const (
      22             :         DefaultCompression Compression = iota
      23             :         NoCompression
      24             :         SnappyCompression
      25             :         ZstdCompression
      26             :         NCompression
      27             : )
      28             : 
      29             : // String implements fmt.Stringer, returning a human-readable name for the
      30             : // compression algorithm.
      31           2 : func (c Compression) String() string {
      32           2 :         switch c {
      33           0 :         case DefaultCompression:
      34           0 :                 return "Default"
      35           2 :         case NoCompression:
      36           2 :                 return "NoCompression"
      37           2 :         case SnappyCompression:
      38           2 :                 return "Snappy"
      39           2 :         case ZstdCompression:
      40           2 :                 return "ZSTD"
      41           0 :         default:
      42           0 :                 return "Unknown"
      43             :         }
      44             : }
      45             : 
      46             : // CompressionFromString returns an sstable.Compression from its
      47             : // string representation. Inverse of c.String() above.
      48           2 : func CompressionFromString(s string) Compression {
      49           2 :         switch s {
      50           0 :         case "Default":
      51           0 :                 return DefaultCompression
      52           2 :         case "NoCompression":
      53           2 :                 return NoCompression
      54           2 :         case "Snappy":
      55           2 :                 return SnappyCompression
      56           1 :         case "ZSTD":
      57           1 :                 return ZstdCompression
      58           2 :         default:
      59           2 :                 return DefaultCompression
      60             :         }
      61             : }
      62             : 
      63             : // CompressionIndicator is the byte stored physically within the block.Trailer
      64             : // to indicate the compression type.
      65             : //
      66             : // TODO(jackson): Avoid exporting once all compression and decompression is
      67             : // delegated to the block package.
      68             : type CompressionIndicator byte
      69             : 
      70             : // The block type gives the per-block compression format.
      71             : // These constants are part of the file format and should not be changed.
      72             : // They are different from the Compression constants because the latter
      73             : // are designed so that the zero value of the Compression type means to
      74             : // use the default compression (which is snappy).
      75             : // Not all compression types listed here are supported.
      76             : const (
      77             :         NoCompressionIndicator     CompressionIndicator = 0
      78             :         SnappyCompressionIndicator CompressionIndicator = 1
      79             :         ZlibCompressionIndicator   CompressionIndicator = 2
      80             :         Bzip2CompressionIndicator  CompressionIndicator = 3
      81             :         Lz4CompressionIndicator    CompressionIndicator = 4
      82             :         Lz4hcCompressionIndicator  CompressionIndicator = 5
      83             :         XpressCompressionIndicator CompressionIndicator = 6
      84             :         ZstdCompressionIndicator   CompressionIndicator = 7
      85             : )
      86             : 
      87             : // String implements fmt.Stringer.
      88           1 : func (i CompressionIndicator) String() string {
      89           1 :         switch i {
      90           1 :         case 0:
      91           1 :                 return "none"
      92           1 :         case 1:
      93           1 :                 return "snappy"
      94           0 :         case 2:
      95           0 :                 return "zlib"
      96           0 :         case 3:
      97           0 :                 return "bzip2"
      98           0 :         case 4:
      99           0 :                 return "lz4"
     100           0 :         case 5:
     101           0 :                 return "lz4hc"
     102           0 :         case 6:
     103           0 :                 return "xpress"
     104           0 :         case 7:
     105           0 :                 return "zstd"
     106           0 :         default:
     107           0 :                 panic(errors.Newf("sstable: unknown block type: %d", i))
     108             :         }
     109             : }
     110             : 
     111             : // DecompressedLen returns the length of the provided block once decompressed,
     112             : // allowing the caller to allocate a buffer exactly sized to the decompressed
     113             : // payload. For some compression algorithms, the payload is prefixed with a
     114             : // varint encoding the length of the decompressed block. In such cases, a
     115             : // non-zero prefixLength is returned indicating the length of this prefix.
     116             : func DecompressedLen(
     117             :         algo CompressionIndicator, b []byte,
     118           2 : ) (decompressedLen int, prefixLength int, err error) {
     119           2 :         switch algo {
     120           0 :         case NoCompressionIndicator:
     121           0 :                 return 0, 0, nil
     122           2 :         case SnappyCompressionIndicator:
     123           2 :                 l, err := snappy.DecodedLen(b)
     124           2 :                 return l, 0, err
     125           2 :         case ZstdCompressionIndicator:
     126           2 :                 // This will also be used by zlib, bzip2 and lz4 to retrieve the decodedLen
     127           2 :                 // if we implement these algorithms in the future.
     128           2 :                 decodedLenU64, varIntLen := binary.Uvarint(b)
     129           2 :                 if varIntLen <= 0 {
     130           0 :                         return 0, 0, base.CorruptionErrorf("pebble/table: compression block has invalid length")
     131           0 :                 }
     132           2 :                 return int(decodedLenU64), varIntLen, nil
     133           0 :         default:
     134           0 :                 return 0, 0, base.CorruptionErrorf("pebble/table: unknown block compression: %d", errors.Safe(algo))
     135             :         }
     136             : }
     137             : 
     138             : // DecompressInto decompresses compressed into buf. The buf slice must have the
     139             : // exact size as the decompressed value. Callers may use DecompressedLen to
     140             : // determine the correct size.
     141           2 : func DecompressInto(algo CompressionIndicator, compressed []byte, buf []byte) error {
     142           2 :         var result []byte
     143           2 :         var err error
     144           2 :         switch algo {
     145           2 :         case SnappyCompressionIndicator:
     146           2 :                 result, err = snappy.Decode(buf, compressed)
     147           2 :         case ZstdCompressionIndicator:
     148           2 :                 result, err = decodeZstd(buf, compressed)
     149           0 :         default:
     150           0 :                 return base.CorruptionErrorf("pebble/table: unknown block compression: %d", errors.Safe(algo))
     151             :         }
     152           2 :         if err != nil {
     153           1 :                 return base.MarkCorruptionError(err)
     154           1 :         }
     155           2 :         if len(result) != len(buf) || (len(result) > 0 && &result[0] != &buf[0]) {
     156           0 :                 return base.CorruptionErrorf("pebble/table: decompressed into unexpected buffer: %p != %p",
     157           0 :                         errors.Safe(result), errors.Safe(buf))
     158           0 :         }
     159           2 :         return nil
     160             : }
     161             : 
     162             : // PhysicalBlock represents a block (possibly compressed) as it is stored
     163             : // physically on disk, including its trailer.
     164             : type PhysicalBlock struct {
     165             :         // data contains the possibly compressed block data.
     166             :         data    []byte
     167             :         trailer Trailer
     168             : }
     169             : 
     170             : // LengthWithTrailer returns the length of the data block, including the trailer.
     171           2 : func (b *PhysicalBlock) LengthWithTrailer() int {
     172           2 :         return len(b.data) + TrailerLen
     173           2 : }
     174             : 
     175             : // LengthWithoutTrailer returns the length of the data block, excluding the trailer.
     176           2 : func (b *PhysicalBlock) LengthWithoutTrailer() int {
     177           2 :         return len(b.data)
     178           2 : }
     179             : 
     180             : // CloneWithByteAlloc returns a deep copy of the block, using the provided
     181             : // bytealloc.A to allocate memory for the new copy.
     182           1 : func (b *PhysicalBlock) CloneWithByteAlloc(a *bytealloc.A) PhysicalBlock {
     183           1 :         var data []byte
     184           1 :         *a, data = (*a).Alloc(len(b.data))
     185           1 :         copy(data, b.data)
     186           1 :         return PhysicalBlock{
     187           1 :                 data:    data,
     188           1 :                 trailer: b.trailer,
     189           1 :         }
     190           1 : }
     191             : 
     192             : // IsCompressed returns true if the block is compressed.
     193           2 : func (b *PhysicalBlock) IsCompressed() bool {
     194           2 :         return CompressionIndicator(b.trailer[0]) != NoCompressionIndicator
     195           2 : }
     196             : 
     197             : // WriteTo writes the block (including its trailer) to the provided Writable. If
     198             : // err == nil, n is the number of bytes successfully written to the Writable.
     199           2 : func (b *PhysicalBlock) WriteTo(w objstorage.Writable) (n int, err error) {
     200           2 :         if err := w.Write(b.data); err != nil {
     201           0 :                 return 0, err
     202           0 :         }
     203           2 :         if err := w.Write(b.trailer[:]); err != nil {
     204           0 :                 return 0, err
     205           0 :         }
     206           2 :         return len(b.data) + len(b.trailer), nil
     207             : }
     208             : 
     209             : // CompressAndChecksum compresses and checksums the provided block, returning
     210             : // the compressed block and its trailer. The dst argument is used for the
     211             : // compressed payload if it's sufficiently large. If it's not, a new buffer is
     212             : // allocated and *dst is updated to point to it.
     213             : //
     214             : // If the compressed block is not sufficiently smaller than the original block,
     215             : // the compressed payload is discarded and the original, uncompressed block is
     216             : // used to avoid unnecessary decompression overhead at read time.
     217             : func CompressAndChecksum(
     218             :         dst *[]byte, block []byte, compression Compression, checksummer *Checksummer,
     219           2 : ) PhysicalBlock {
     220           2 :         // Compress the buffer, discarding the result if the improvement isn't at
     221           2 :         // least 12.5%.
     222           2 :         algo := NoCompressionIndicator
     223           2 :         if compression != NoCompression {
     224           2 :                 var compressed []byte
     225           2 :                 algo, compressed = compress(compression, block, *dst)
     226           2 :                 if algo != NoCompressionIndicator && cap(compressed) > cap(*dst) {
     227           2 :                         *dst = compressed[:cap(compressed)]
     228           2 :                 }
     229           2 :                 if len(compressed) < len(block)-len(block)/8 {
     230           2 :                         block = compressed
     231           2 :                 } else {
     232           2 :                         algo = NoCompressionIndicator
     233           2 :                 }
     234             :         }
     235             : 
     236             :         // Calculate the checksum.
     237           2 :         pb := PhysicalBlock{data: block}
     238           2 :         pb.trailer[0] = byte(algo)
     239           2 :         checksum := checksummer.Checksum(block, pb.trailer[:1])
     240           2 :         pb.trailer = MakeTrailer(byte(algo), checksum)
     241           2 :         return pb
     242             : }
     243             : 
     244             : // compress compresses a sstable block, using dstBuf as the desired destination.
     245             : func compress(
     246             :         compression Compression, b []byte, dstBuf []byte,
     247           2 : ) (indicator CompressionIndicator, compressed []byte) {
     248           2 :         switch compression {
     249           2 :         case SnappyCompression:
     250           2 :                 return SnappyCompressionIndicator, snappy.Encode(dstBuf, b)
     251           1 :         case NoCompression:
     252           1 :                 return NoCompressionIndicator, b
     253           2 :         case ZstdCompression:
     254           2 :                 if len(dstBuf) < binary.MaxVarintLen64 {
     255           2 :                         dstBuf = append(dstBuf, make([]byte, binary.MaxVarintLen64-len(dstBuf))...)
     256           2 :                 }
     257           2 :                 varIntLen := binary.PutUvarint(dstBuf, uint64(len(b)))
     258           2 :                 return ZstdCompressionIndicator, encodeZstd(dstBuf, varIntLen, b)
     259           0 :         default:
     260           0 :                 panic("unreachable")
     261             :         }
     262             : }

Generated by: LCOV version 1.14