LCOV - code coverage report
Current view: top level - pebble/batchrepr - reader.go (source / functions) Hit Total Coverage
Test: 2024-03-27 08:24Z 5babbee7 - tests only.lcov Lines: 68 72 94.4 %
Date: 2024-03-27 08:24:56 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : // Package batchrepr provides interfaces for reading and writing the binary
       6             : // batch representation. This batch representation is used in-memory while
       7             : // constructing a batch and on-disk within the write-ahead log.
       8             : package batchrepr
       9             : 
      10             : import (
      11             :         "encoding/binary"
      12             :         "fmt"
      13             :         "unsafe"
      14             : 
      15             :         "github.com/cockroachdb/pebble/internal/base"
      16             :         "github.com/pkg/errors"
      17             : )
      18             : 
      19             : // ErrInvalidBatch indicates that a batch is invalid or otherwise corrupted.
      20             : var ErrInvalidBatch = base.MarkCorruptionError(errors.New("pebble: invalid batch"))
      21             : 
      22             : const (
      23             :         // HeaderLen is the length of the batch header in bytes.
      24             :         HeaderLen = 12
      25             :         // countOffset is the index into the batch representation where the
      26             :         // count is stored, encoded as a little-endian uint32.
      27             :         countOffset = 8
      28             : )
      29             : 
      30             : // IsEmpty returns true iff the batch contains zero keys.
      31           1 : func IsEmpty(repr []byte) bool {
      32           1 :         return len(repr) <= HeaderLen
      33           1 : }
      34             : 
      35             : // ReadHeader reads the contents of the batch header. If the repr is too small
      36             : // to contain a valid batch header, ReadHeader returns ok=false.
      37           1 : func ReadHeader(repr []byte) (h Header, ok bool) {
      38           1 :         if len(repr) < HeaderLen {
      39           1 :                 return h, false
      40           1 :         }
      41           1 :         return Header{
      42           1 :                 SeqNum: ReadSeqNum(repr),
      43           1 :                 Count:  binary.LittleEndian.Uint32(repr[countOffset:HeaderLen]),
      44           1 :         }, true
      45             : }
      46             : 
      47             : // Header describes the contents of a batch header.
      48             : type Header struct {
      49             :         // SeqNum is the sequence number at which the batch is committed. A batch
      50             :         // that has not yet committed will have a zero sequence number.
      51             :         SeqNum uint64
      52             :         // Count is the count of keys written to the batch.
      53             :         Count uint32
      54             : }
      55             : 
      56             : // String returns a string representation of the header's contents.
      57           1 : func (h Header) String() string {
      58           1 :         return fmt.Sprintf("[seqNum=%d,count=%d]", h.SeqNum, h.Count)
      59           1 : }
      60             : 
      61             : // ReadSeqNum reads the sequence number encoded within the batch. ReadSeqNum
      62             : // does not validate that the repr is valid. It's exported only for very
      63             : // performance sensitive code paths that should not necessarily read the rest of
      64             : // the header as well.
      65           1 : func ReadSeqNum(repr []byte) uint64 {
      66           1 :         return binary.LittleEndian.Uint64(repr[:countOffset])
      67           1 : }
      68             : 
      69             : // Read constructs a Reader from an encoded batch representation, ignoring the
      70             : // contents of the Header.
      71           1 : func Read(repr []byte) (r Reader) {
      72           1 :         if len(repr) <= HeaderLen {
      73           1 :                 return nil
      74           1 :         }
      75           1 :         return repr[HeaderLen:]
      76             : }
      77             : 
      78             : // Reader iterates over the entries contained in a batch.
      79             : type Reader []byte
      80             : 
      81             : // Next returns the next entry in this batch, if there is one. If the reader has
      82             : // reached the end of the batch, Next returns ok=false and a nil error. If the
      83             : // batch is corrupt and the next entry is illegible, Next returns ok=false and a
      84             : // non-nil error.
      85           1 : func (r *Reader) Next() (kind base.InternalKeyKind, ukey []byte, value []byte, ok bool, err error) {
      86           1 :         if len(*r) == 0 {
      87           1 :                 return 0, nil, nil, false, nil
      88           1 :         }
      89           1 :         kind = base.InternalKeyKind((*r)[0])
      90           1 :         if kind > base.InternalKeyKindMax {
      91           1 :                 return 0, nil, nil, false, errors.Wrapf(ErrInvalidBatch, "invalid key kind 0x%x", (*r)[0])
      92           1 :         }
      93           1 :         *r, ukey, ok = DecodeStr((*r)[1:])
      94           1 :         if !ok {
      95           1 :                 return 0, nil, nil, false, errors.Wrapf(ErrInvalidBatch, "decoding user key")
      96           1 :         }
      97           1 :         switch kind {
      98             :         case base.InternalKeyKindSet, base.InternalKeyKindMerge, base.InternalKeyKindRangeDelete,
      99             :                 base.InternalKeyKindRangeKeySet, base.InternalKeyKindRangeKeyUnset, base.InternalKeyKindRangeKeyDelete,
     100           1 :                 base.InternalKeyKindDeleteSized:
     101           1 :                 *r, value, ok = DecodeStr(*r)
     102           1 :                 if !ok {
     103           1 :                         return 0, nil, nil, false, errors.Wrapf(ErrInvalidBatch, "decoding %s value", kind)
     104           1 :                 }
     105             :         }
     106           1 :         return kind, ukey, value, true, nil
     107             : }
     108             : 
     109             : // DecodeStr decodes a varint encoded string from data, returning the remainder
     110             : // of data and the decoded string. It returns ok=false if the varint is invalid.
     111             : //
     112             : // TODO(jackson): This should be unexported once pebble package callers have
     113             : // been updated to use appropriate abstractions.
     114           1 : func DecodeStr(data []byte) (odata []byte, s []byte, ok bool) {
     115           1 :         // TODO(jackson): This will index out of bounds if there's no varint or an
     116           1 :         // invalid varint (eg, a single 0xff byte). Correcting will add a bit of
     117           1 :         // overhead. We could avoid that overhead whenever len(data) >=
     118           1 :         // binary.MaxVarint32?
     119           1 : 
     120           1 :         var v uint32
     121           1 :         var n int
     122           1 :         ptr := unsafe.Pointer(&data[0])
     123           1 :         if a := *((*uint8)(ptr)); a < 128 {
     124           1 :                 v = uint32(a)
     125           1 :                 n = 1
     126           1 :         } else if a, b := a&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 1))); b < 128 {
     127           1 :                 v = uint32(b)<<7 | uint32(a)
     128           1 :                 n = 2
     129           1 :         } else if b, c := b&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 2))); c < 128 {
     130           1 :                 v = uint32(c)<<14 | uint32(b)<<7 | uint32(a)
     131           1 :                 n = 3
     132           1 :         } else if c, d := c&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 3))); d < 128 {
     133           1 :                 v = uint32(d)<<21 | uint32(c)<<14 | uint32(b)<<7 | uint32(a)
     134           1 :                 n = 4
     135           1 :         } else {
     136           0 :                 d, e := d&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 4)))
     137           0 :                 v = uint32(e)<<28 | uint32(d)<<21 | uint32(c)<<14 | uint32(b)<<7 | uint32(a)
     138           0 :                 n = 5
     139           0 :         }
     140             : 
     141           1 :         data = data[n:]
     142           1 :         if v > uint32(len(data)) {
     143           1 :                 return nil, nil, false
     144           1 :         }
     145           1 :         return data[v:], data[:v], true
     146             : }

Generated by: LCOV version 1.14