LCOV - code coverage report
Current view: top level - pebble/sstable - block_writer.go (source / functions) Hit Total Coverage
Test: 2024-06-11 08:16Z 9b0824f2 - meta test only.lcov Lines: 148 157 94.3 %
Date: 2024-06-11 08:17:23 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2018 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package sstable
       6             : 
       7             : import (
       8             :         "encoding/binary"
       9             : 
      10             :         "github.com/cockroachdb/errors"
      11             :         "github.com/cockroachdb/pebble/internal/base"
      12             : )
      13             : 
      14             : type blockWriter struct {
      15             :         restartInterval int
      16             :         nEntries        int
      17             :         nextRestart     int
      18             :         buf             []byte
      19             :         // For datablocks in TableFormatPebblev3, we steal the most significant bit
      20             :         // in restarts for encoding setHasSameKeyPrefixSinceLastRestart. This leaves
      21             :         // us with 31 bits, which is more than enough (no one needs > 2GB blocks).
      22             :         // Typically, restarts occur every 16 keys, and by storing this bit with the
      23             :         // restart, we can optimize for the case where a user wants to skip to the
      24             :         // next prefix which happens to be in the same data block, but is > 16 keys
      25             :         // away. We have seen production situations with 100+ versions per MVCC key
      26             :         // (which share the same prefix). Additionally, for such writers, the prefix
      27             :         // compression of the key, that shares the key with the preceding key, is
      28             :         // limited to the prefix part of the preceding key -- this ensures that when
      29             :         // doing NPrefix (see blockIter) we don't need to assemble the full key
      30             :         // for each step since by limiting the length of the shared key we are
      31             :         // ensuring that any of the keys with the same prefix can be used to
      32             :         // assemble the full key when the prefix does change.
      33             :         restarts []uint32
      34             :         // Do not read curKey directly from outside blockWriter since it can have
      35             :         // the InternalKeyKindSSTableInternalObsoleteBit set. Use getCurKey() or
      36             :         // getCurUserKey() instead.
      37             :         curKey []byte
      38             :         // curValue excludes the optional prefix provided to
      39             :         // storeWithOptionalValuePrefix.
      40             :         curValue []byte
      41             :         prevKey  []byte
      42             :         tmp      [4]byte
      43             :         // We don't know the state of the sets that were at the end of the previous
      44             :         // block, so this is initially 0. It may be true for the second and later
      45             :         // restarts in a block. Not having inter-block information is fine since we
      46             :         // will optimize by stepping through restarts only within the same block.
      47             :         // Note that the first restart is the first key in the block.
      48             :         setHasSameKeyPrefixSinceLastRestart bool
      49             : }
      50             : 
      51           1 : func (w *blockWriter) clear() {
      52           1 :         *w = blockWriter{
      53           1 :                 buf:      w.buf[:0],
      54           1 :                 restarts: w.restarts[:0],
      55           1 :                 curKey:   w.curKey[:0],
      56           1 :                 curValue: w.curValue[:0],
      57           1 :                 prevKey:  w.prevKey[:0],
      58           1 :         }
      59           1 : }
      60             : 
      61             : // MaximumBlockSize is an extremely generous maximum block size of 256MiB. We
      62             : // explicitly place this limit to reserve a few bits in the restart for
      63             : // internal use.
      64             : const MaximumBlockSize = 1 << 28
      65             : const setHasSameKeyPrefixRestartMask uint32 = 1 << 31
      66             : const restartMaskLittleEndianHighByteWithoutSetHasSamePrefix byte = 0b0111_1111
      67             : const restartMaskLittleEndianHighByteOnlySetHasSamePrefix byte = 0b1000_0000
      68             : 
      69           1 : func (w *blockWriter) getCurKey() InternalKey {
      70           1 :         k := base.DecodeInternalKey(w.curKey)
      71           1 :         k.Trailer = k.Trailer & trailerObsoleteMask
      72           1 :         return k
      73           1 : }
      74             : 
      75           1 : func (w *blockWriter) getCurUserKey() []byte {
      76           1 :         n := len(w.curKey) - base.InternalTrailerLen
      77           1 :         if n < 0 {
      78           0 :                 panic(errors.AssertionFailedf("corrupt key in blockWriter buffer"))
      79             :         }
      80           1 :         return w.curKey[:n:n]
      81             : }
      82             : 
      83             : // If !addValuePrefix, the valuePrefix is ignored.
      84             : func (w *blockWriter) storeWithOptionalValuePrefix(
      85             :         keySize int,
      86             :         value []byte,
      87             :         maxSharedKeyLen int,
      88             :         addValuePrefix bool,
      89             :         valuePrefix valuePrefix,
      90             :         setHasSameKeyPrefix bool,
      91           1 : ) {
      92           1 :         shared := 0
      93           1 :         if !setHasSameKeyPrefix {
      94           1 :                 w.setHasSameKeyPrefixSinceLastRestart = false
      95           1 :         }
      96           1 :         if w.nEntries == w.nextRestart {
      97           1 :                 w.nextRestart = w.nEntries + w.restartInterval
      98           1 :                 restart := uint32(len(w.buf))
      99           1 :                 if w.setHasSameKeyPrefixSinceLastRestart {
     100           1 :                         restart = restart | setHasSameKeyPrefixRestartMask
     101           1 :                 }
     102           1 :                 w.setHasSameKeyPrefixSinceLastRestart = true
     103           1 :                 w.restarts = append(w.restarts, restart)
     104           1 :         } else {
     105           1 :                 // TODO(peter): Manually inlined version of base.SharedPrefixLen(). This
     106           1 :                 // is 3% faster on BenchmarkWriter on go1.16. Remove if future versions
     107           1 :                 // show this to not be a performance win. For now, functions that use of
     108           1 :                 // unsafe cannot be inlined.
     109           1 :                 n := maxSharedKeyLen
     110           1 :                 if n > len(w.prevKey) {
     111           1 :                         n = len(w.prevKey)
     112           1 :                 }
     113           1 :                 asUint64 := func(b []byte, i int) uint64 {
     114           1 :                         return binary.LittleEndian.Uint64(b[i:])
     115           1 :                 }
     116           1 :                 for shared < n-7 && asUint64(w.curKey, shared) == asUint64(w.prevKey, shared) {
     117           1 :                         shared += 8
     118           1 :                 }
     119           1 :                 for shared < n && w.curKey[shared] == w.prevKey[shared] {
     120           1 :                         shared++
     121           1 :                 }
     122             :         }
     123             : 
     124           1 :         lenValuePlusOptionalPrefix := len(value)
     125           1 :         if addValuePrefix {
     126           1 :                 lenValuePlusOptionalPrefix++
     127           1 :         }
     128           1 :         needed := 3*binary.MaxVarintLen32 + len(w.curKey[shared:]) + lenValuePlusOptionalPrefix
     129           1 :         n := len(w.buf)
     130           1 :         if cap(w.buf) < n+needed {
     131           1 :                 newCap := 2 * cap(w.buf)
     132           1 :                 if newCap == 0 {
     133           1 :                         newCap = 1024
     134           1 :                 }
     135           1 :                 for newCap < n+needed {
     136           1 :                         newCap *= 2
     137           1 :                 }
     138           1 :                 newBuf := make([]byte, n, newCap)
     139           1 :                 copy(newBuf, w.buf)
     140           1 :                 w.buf = newBuf
     141             :         }
     142           1 :         w.buf = w.buf[:n+needed]
     143           1 : 
     144           1 :         // TODO(peter): Manually inlined versions of binary.PutUvarint(). This is 15%
     145           1 :         // faster on BenchmarkWriter on go1.13. Remove if go1.14 or future versions
     146           1 :         // show this to not be a performance win.
     147           1 :         {
     148           1 :                 x := uint32(shared)
     149           1 :                 for x >= 0x80 {
     150           0 :                         w.buf[n] = byte(x) | 0x80
     151           0 :                         x >>= 7
     152           0 :                         n++
     153           0 :                 }
     154           1 :                 w.buf[n] = byte(x)
     155           1 :                 n++
     156             :         }
     157             : 
     158           1 :         {
     159           1 :                 x := uint32(keySize - shared)
     160           1 :                 for x >= 0x80 {
     161           0 :                         w.buf[n] = byte(x) | 0x80
     162           0 :                         x >>= 7
     163           0 :                         n++
     164           0 :                 }
     165           1 :                 w.buf[n] = byte(x)
     166           1 :                 n++
     167             :         }
     168             : 
     169           1 :         {
     170           1 :                 x := uint32(lenValuePlusOptionalPrefix)
     171           1 :                 for x >= 0x80 {
     172           1 :                         w.buf[n] = byte(x) | 0x80
     173           1 :                         x >>= 7
     174           1 :                         n++
     175           1 :                 }
     176           1 :                 w.buf[n] = byte(x)
     177           1 :                 n++
     178             :         }
     179             : 
     180           1 :         n += copy(w.buf[n:], w.curKey[shared:])
     181           1 :         if addValuePrefix {
     182           1 :                 w.buf[n : n+1][0] = byte(valuePrefix)
     183           1 :                 n++
     184           1 :         }
     185           1 :         n += copy(w.buf[n:], value)
     186           1 :         w.buf = w.buf[:n]
     187           1 : 
     188           1 :         w.curValue = w.buf[n-len(value):]
     189           1 : 
     190           1 :         w.nEntries++
     191             : }
     192             : 
     193           1 : func (w *blockWriter) add(key InternalKey, value []byte) {
     194           1 :         w.addWithOptionalValuePrefix(
     195           1 :                 key, false, value, len(key.UserKey), false, 0, false)
     196           1 : }
     197             : 
     198             : // Callers that always set addValuePrefix to false should use add() instead.
     199             : //
     200             : // isObsolete indicates whether this key-value pair is obsolete in this
     201             : // sstable (only applicable when writing data blocks) -- see the comment in
     202             : // table.go and the longer one in format.go. addValuePrefix adds a 1 byte
     203             : // prefix to the value, specified in valuePrefix -- this is used for data
     204             : // blocks in TableFormatPebblev3 onwards for SETs (see the comment in
     205             : // format.go, with more details in value_block.go). setHasSameKeyPrefix is
     206             : // also used in TableFormatPebblev3 onwards for SETs.
     207             : func (w *blockWriter) addWithOptionalValuePrefix(
     208             :         key InternalKey,
     209             :         isObsolete bool,
     210             :         value []byte,
     211             :         maxSharedKeyLen int,
     212             :         addValuePrefix bool,
     213             :         valuePrefix valuePrefix,
     214             :         setHasSameKeyPrefix bool,
     215           1 : ) {
     216           1 :         w.curKey, w.prevKey = w.prevKey, w.curKey
     217           1 : 
     218           1 :         size := key.Size()
     219           1 :         if cap(w.curKey) < size {
     220           1 :                 w.curKey = make([]byte, 0, size*2)
     221           1 :         }
     222           1 :         w.curKey = w.curKey[:size]
     223           1 :         if isObsolete {
     224           1 :                 key.Trailer = key.Trailer | trailerObsoleteBit
     225           1 :         }
     226           1 :         key.Encode(w.curKey)
     227           1 : 
     228           1 :         w.storeWithOptionalValuePrefix(
     229           1 :                 size, value, maxSharedKeyLen, addValuePrefix, valuePrefix, setHasSameKeyPrefix)
     230             : }
     231             : 
     232           1 : func (w *blockWriter) finish() []byte {
     233           1 :         // Write the restart points to the buffer.
     234           1 :         if w.nEntries == 0 {
     235           1 :                 // Every block must have at least one restart point.
     236           1 :                 if cap(w.restarts) > 0 {
     237           1 :                         w.restarts = w.restarts[:1]
     238           1 :                         w.restarts[0] = 0
     239           1 :                 } else {
     240           1 :                         w.restarts = append(w.restarts, 0)
     241           1 :                 }
     242             :         }
     243           1 :         tmp4 := w.tmp[:4]
     244           1 :         for _, x := range w.restarts {
     245           1 :                 binary.LittleEndian.PutUint32(tmp4, x)
     246           1 :                 w.buf = append(w.buf, tmp4...)
     247           1 :         }
     248           1 :         binary.LittleEndian.PutUint32(tmp4, uint32(len(w.restarts)))
     249           1 :         w.buf = append(w.buf, tmp4...)
     250           1 :         result := w.buf
     251           1 : 
     252           1 :         // Reset the block state.
     253           1 :         w.nEntries = 0
     254           1 :         w.nextRestart = 0
     255           1 :         w.buf = w.buf[:0]
     256           1 :         w.restarts = w.restarts[:0]
     257           1 :         return result
     258             : }
     259             : 
     260             : // emptyBlockSize holds the size of an empty block. Every block ends
     261             : // in a uint32 trailer encoding the number of restart points within the
     262             : // block.
     263             : const emptyBlockSize = 4
     264             : 
     265           1 : func (w *blockWriter) estimatedSize() int {
     266           1 :         return len(w.buf) + 4*len(w.restarts) + emptyBlockSize
     267           1 : }

Generated by: LCOV version 1.14