LCOV - code coverage report
Current view: top level - pebble/sstable/rowblk - rowblk_writer.go (source / functions) Hit Total Coverage
Test: 2024-07-14 08:16Z 9a4ea4df - tests + meta.lcov Lines: 172 177 97.2 %
Date: 2024-07-14 08:17:27 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2018 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : // Package rowblk defines facilities for row-oriented sstable blocks.
       6             : package rowblk
       7             : 
       8             : import (
       9             :         "encoding/binary"
      10             :         "unsafe"
      11             : 
      12             :         "github.com/cockroachdb/errors"
      13             :         "github.com/cockroachdb/pebble/internal/base"
      14             :         "github.com/cockroachdb/pebble/sstable/block"
      15             : )
      16             : 
      17             : const (
      18             :         // MaximumSize is an extremely generous maximum block size of 256MiB. We
      19             :         // explicitly place this limit to reserve a few bits in the restart for internal
      20             :         // use.
      21             :         MaximumSize = 1 << 28
      22             :         // EmptySize holds the size of an empty block. Every block ends in a uint32
      23             :         // trailer encoding the number of restart points within the block.
      24             :         EmptySize = 4
      25             : )
      26             : 
      27             : const (
      28             :         // TrailerObsoleteBit is a bit within the internal key trailer that's used
      29             :         // by the row-oriented block format to signify when a key is obsolete. It's
      30             :         // internal to the row-oriented block format, set when writing a block and
      31             :         // unset by blockIter, so no code outside block writing/reading code ever
      32             :         // sees it.
      33             :         //
      34             :         // TODO(jackson): Unexport once the blockIter is also in this package.
      35             :         TrailerObsoleteBit = base.InternalKeyTrailer(base.InternalKeyKindSSTableInternalObsoleteBit)
      36             :         // TrailerObsoleteMask defines a mask for the obsolete bit in the internal
      37             :         // key trailer.
      38             :         TrailerObsoleteMask = (base.InternalKeyTrailer(base.SeqNumMax) << 8) | base.InternalKeyTrailer(base.InternalKeyKindSSTableInternalObsoleteMask)
      39             : )
      40             : 
      41             : // Writer buffers and serializes key/value pairs into a row-oriented block.
      42             : type Writer struct {
      43             :         // RestartInterval configures the interval at which the writer will write a
      44             :         // full key without prefix compression, and encode a corresponding restart
      45             :         // point.
      46             :         RestartInterval int
      47             :         nEntries        int
      48             :         nextRestart     int
      49             :         buf             []byte
      50             :         // For datablocks in TableFormatPebblev3, we steal the most significant bit
      51             :         // in restarts for encoding setHasSameKeyPrefixSinceLastRestart. This leaves
      52             :         // us with 31 bits, which is more than enough (no one needs > 2GB blocks).
      53             :         // Typically, restarts occur every 16 keys, and by storing this bit with the
      54             :         // restart, we can optimize for the case where a user wants to skip to the
      55             :         // next prefix which happens to be in the same data block, but is > 16 keys
      56             :         // away. We have seen production situations with 100+ versions per MVCC key
      57             :         // (which share the same prefix). Additionally, for such writers, the prefix
      58             :         // compression of the key, that shares the key with the preceding key, is
      59             :         // limited to the prefix part of the preceding key -- this ensures that when
      60             :         // doing NPrefix (see blockIter) we don't need to assemble the full key
      61             :         // for each step since by limiting the length of the shared key we are
      62             :         // ensuring that any of the keys with the same prefix can be used to
      63             :         // assemble the full key when the prefix does change.
      64             :         restarts []uint32
      65             :         // Do not read curKey directly from outside blockWriter since it can have
      66             :         // the InternalKeyKindSSTableInternalObsoleteBit set. Use getCurKey() or
      67             :         // getCurUserKey() instead.
      68             :         curKey []byte
      69             :         // curValue excludes the optional prefix provided to
      70             :         // storeWithOptionalValuePrefix.
      71             :         curValue []byte
      72             :         prevKey  []byte
      73             :         tmp      [4]byte
      74             :         // We don't know the state of the sets that were at the end of the previous
      75             :         // block, so this is initially 0. It may be true for the second and later
      76             :         // restarts in a block. Not having inter-block information is fine since we
      77             :         // will optimize by stepping through restarts only within the same block.
      78             :         // Note that the first restart is the first key in the block.
      79             :         setHasSameKeyPrefixSinceLastRestart bool
      80             : }
      81             : 
      82             : // Reset resets the block writer to empty, preserving buffers for reuse.
      83           2 : func (w *Writer) Reset() {
      84           2 :         *w = Writer{
      85           2 :                 buf:      w.buf[:0],
      86           2 :                 restarts: w.restarts[:0],
      87           2 :                 curKey:   w.curKey[:0],
      88           2 :                 curValue: w.curValue[:0],
      89           2 :                 prevKey:  w.prevKey[:0],
      90           2 :         }
      91           2 : }
      92             : 
      93             : const setHasSameKeyPrefixRestartMask uint32 = 1 << 31
      94             : 
      95             : // EntryCount returns the count of entries written to the writer.
      96           2 : func (w *Writer) EntryCount() int {
      97           2 :         return w.nEntries
      98           2 : }
      99             : 
     100             : // CurKey returns the most recently written key.
     101           2 : func (w *Writer) CurKey() base.InternalKey {
     102           2 :         k := base.DecodeInternalKey(w.curKey)
     103           2 :         k.Trailer = k.Trailer & TrailerObsoleteMask
     104           2 :         return k
     105           2 : }
     106             : 
     107             : // CurValue returns the most recently written value.
     108           2 : func (w *Writer) CurValue() []byte {
     109           2 :         return w.curValue
     110           2 : }
     111             : 
     112             : // CurUserKey returns the most recently written user key.
     113           2 : func (w *Writer) CurUserKey() []byte {
     114           2 :         n := len(w.curKey) - base.InternalTrailerLen
     115           2 :         if n < 0 {
     116           0 :                 panic(errors.AssertionFailedf("corrupt key in blockWriter buffer"))
     117             :         }
     118           2 :         return w.curKey[:n:n]
     119             : }
     120             : 
     121             : // If !addValuePrefix, the valuePrefix is ignored.
     122             : func (w *Writer) storeWithOptionalValuePrefix(
     123             :         keySize int,
     124             :         value []byte,
     125             :         maxSharedKeyLen int,
     126             :         addValuePrefix bool,
     127             :         valuePrefix block.ValuePrefix,
     128             :         setHasSameKeyPrefix bool,
     129           2 : ) {
     130           2 :         shared := 0
     131           2 :         if !setHasSameKeyPrefix {
     132           2 :                 w.setHasSameKeyPrefixSinceLastRestart = false
     133           2 :         }
     134           2 :         if w.nEntries == w.nextRestart {
     135           2 :                 w.nextRestart = w.nEntries + w.RestartInterval
     136           2 :                 restart := uint32(len(w.buf))
     137           2 :                 if w.setHasSameKeyPrefixSinceLastRestart {
     138           2 :                         restart = restart | setHasSameKeyPrefixRestartMask
     139           2 :                 }
     140           2 :                 w.setHasSameKeyPrefixSinceLastRestart = true
     141           2 :                 w.restarts = append(w.restarts, restart)
     142           2 :         } else {
     143           2 :                 // TODO(peter): Manually inlined version of base.SharedPrefixLen(). This
     144           2 :                 // is 3% faster on BenchmarkWriter on go1.16. Remove if future versions
     145           2 :                 // show this to not be a performance win. For now, functions that use of
     146           2 :                 // unsafe cannot be inlined.
     147           2 :                 n := maxSharedKeyLen
     148           2 :                 if n > len(w.prevKey) {
     149           2 :                         n = len(w.prevKey)
     150           2 :                 }
     151           2 :                 asUint64 := func(b []byte, i int) uint64 {
     152           2 :                         return binary.LittleEndian.Uint64(b[i:])
     153           2 :                 }
     154           2 :                 for shared < n-7 && asUint64(w.curKey, shared) == asUint64(w.prevKey, shared) {
     155           2 :                         shared += 8
     156           2 :                 }
     157           2 :                 for shared < n && w.curKey[shared] == w.prevKey[shared] {
     158           2 :                         shared++
     159           2 :                 }
     160             :         }
     161             : 
     162           2 :         lenValuePlusOptionalPrefix := len(value)
     163           2 :         if addValuePrefix {
     164           2 :                 lenValuePlusOptionalPrefix++
     165           2 :         }
     166           2 :         needed := 3*binary.MaxVarintLen32 + len(w.curKey[shared:]) + lenValuePlusOptionalPrefix
     167           2 :         n := len(w.buf)
     168           2 :         if cap(w.buf) < n+needed {
     169           2 :                 newCap := 2 * cap(w.buf)
     170           2 :                 if newCap == 0 {
     171           2 :                         newCap = 1024
     172           2 :                 }
     173           2 :                 for newCap < n+needed {
     174           2 :                         newCap *= 2
     175           2 :                 }
     176           2 :                 newBuf := make([]byte, n, newCap)
     177           2 :                 copy(newBuf, w.buf)
     178           2 :                 w.buf = newBuf
     179             :         }
     180           2 :         w.buf = w.buf[:n+needed]
     181           2 : 
     182           2 :         // TODO(peter): Manually inlined versions of binary.PutUvarint(). This is 15%
     183           2 :         // faster on BenchmarkWriter on go1.13. Remove if go1.14 or future versions
     184           2 :         // show this to not be a performance win.
     185           2 :         {
     186           2 :                 x := uint32(shared)
     187           2 :                 for x >= 0x80 {
     188           0 :                         w.buf[n] = byte(x) | 0x80
     189           0 :                         x >>= 7
     190           0 :                         n++
     191           0 :                 }
     192           2 :                 w.buf[n] = byte(x)
     193           2 :                 n++
     194             :         }
     195             : 
     196           2 :         {
     197           2 :                 x := uint32(keySize - shared)
     198           2 :                 for x >= 0x80 {
     199           1 :                         w.buf[n] = byte(x) | 0x80
     200           1 :                         x >>= 7
     201           1 :                         n++
     202           1 :                 }
     203           2 :                 w.buf[n] = byte(x)
     204           2 :                 n++
     205             :         }
     206             : 
     207           2 :         {
     208           2 :                 x := uint32(lenValuePlusOptionalPrefix)
     209           2 :                 for x >= 0x80 {
     210           2 :                         w.buf[n] = byte(x) | 0x80
     211           2 :                         x >>= 7
     212           2 :                         n++
     213           2 :                 }
     214           2 :                 w.buf[n] = byte(x)
     215           2 :                 n++
     216             :         }
     217             : 
     218           2 :         n += copy(w.buf[n:], w.curKey[shared:])
     219           2 :         if addValuePrefix {
     220           2 :                 w.buf[n : n+1][0] = byte(valuePrefix)
     221           2 :                 n++
     222           2 :         }
     223           2 :         n += copy(w.buf[n:], value)
     224           2 :         w.buf = w.buf[:n]
     225           2 : 
     226           2 :         w.curValue = w.buf[n-len(value):]
     227           2 : 
     228           2 :         w.nEntries++
     229             : }
     230             : 
     231             : // Add adds a key value pair to the block without a value prefix.
     232           2 : func (w *Writer) Add(key base.InternalKey, value []byte) {
     233           2 :         w.AddWithOptionalValuePrefix(
     234           2 :                 key, false, value, len(key.UserKey), false, 0, false)
     235           2 : }
     236             : 
     237             : // AddWithOptionalValuePrefix adds a key value pair to the block, optionally
     238             : // including a value prefix.
     239             : //
     240             : // Callers that always set addValuePrefix to false should use add() instead.
     241             : //
     242             : // isObsolete indicates whether this key-value pair is obsolete in this
     243             : // sstable (only applicable when writing data blocks) -- see the comment in
     244             : // table.go and the longer one in format.go. addValuePrefix adds a 1 byte
     245             : // prefix to the value, specified in valuePrefix -- this is used for data
     246             : // blocks in TableFormatPebblev3 onwards for SETs (see the comment in
     247             : // format.go, with more details in value_block.go). setHasSameKeyPrefix is
     248             : // also used in TableFormatPebblev3 onwards for SETs.
     249             : func (w *Writer) AddWithOptionalValuePrefix(
     250             :         key base.InternalKey,
     251             :         isObsolete bool,
     252             :         value []byte,
     253             :         maxSharedKeyLen int,
     254             :         addValuePrefix bool,
     255             :         valuePrefix block.ValuePrefix,
     256             :         setHasSameKeyPrefix bool,
     257           2 : ) {
     258           2 :         w.curKey, w.prevKey = w.prevKey, w.curKey
     259           2 : 
     260           2 :         size := key.Size()
     261           2 :         if cap(w.curKey) < size {
     262           2 :                 w.curKey = make([]byte, 0, size*2)
     263           2 :         }
     264           2 :         w.curKey = w.curKey[:size]
     265           2 :         if isObsolete {
     266           2 :                 key.Trailer = key.Trailer | TrailerObsoleteBit
     267           2 :         }
     268           2 :         key.Encode(w.curKey)
     269           2 : 
     270           2 :         w.storeWithOptionalValuePrefix(
     271           2 :                 size, value, maxSharedKeyLen, addValuePrefix, valuePrefix, setHasSameKeyPrefix)
     272             : }
     273             : 
     274             : // Finish finalizes the block, serializes it and returns the serialized data.
     275           2 : func (w *Writer) Finish() []byte {
     276           2 :         // Write the restart points to the buffer.
     277           2 :         if w.nEntries == 0 {
     278           2 :                 // Every block must have at least one restart point.
     279           2 :                 if cap(w.restarts) > 0 {
     280           2 :                         w.restarts = w.restarts[:1]
     281           2 :                         w.restarts[0] = 0
     282           2 :                 } else {
     283           2 :                         w.restarts = append(w.restarts, 0)
     284           2 :                 }
     285             :         }
     286           2 :         tmp4 := w.tmp[:4]
     287           2 :         for _, x := range w.restarts {
     288           2 :                 binary.LittleEndian.PutUint32(tmp4, x)
     289           2 :                 w.buf = append(w.buf, tmp4...)
     290           2 :         }
     291           2 :         binary.LittleEndian.PutUint32(tmp4, uint32(len(w.restarts)))
     292           2 :         w.buf = append(w.buf, tmp4...)
     293           2 :         result := w.buf
     294           2 : 
     295           2 :         // Reset the block state.
     296           2 :         w.nEntries = 0
     297           2 :         w.nextRestart = 0
     298           2 :         w.buf = w.buf[:0]
     299           2 :         w.restarts = w.restarts[:0]
     300           2 :         return result
     301             : }
     302             : 
     303             : // EstimatedSize returns the estimated size of the block in bytes.
     304           2 : func (w *Writer) EstimatedSize() int {
     305           2 :         return len(w.buf) + 4*len(w.restarts) + EmptySize
     306           2 : }
     307             : 
     308             : // AddRaw adds a key value pair to the block.
     309           2 : func (w *Writer) AddRaw(key, value []byte) {
     310           2 :         w.curKey, w.prevKey = w.prevKey, w.curKey
     311           2 : 
     312           2 :         size := len(key)
     313           2 :         if cap(w.curKey) < size {
     314           2 :                 w.curKey = make([]byte, 0, size*2)
     315           2 :         }
     316           2 :         w.curKey = w.curKey[:size]
     317           2 :         copy(w.curKey, key)
     318           2 :         w.storeWithOptionalValuePrefix(
     319           2 :                 size, value, len(key), false, 0, false)
     320             : }
     321             : 
     322             : // AddRawString is AddRaw but with a string key.
     323           2 : func (w *Writer) AddRawString(key string, value []byte) {
     324           2 :         w.AddRaw(unsafe.Slice(unsafe.StringData(key), len(key)), value)
     325           2 : }

Generated by: LCOV version 1.14