LCOV - code coverage report
Current view: top level - pebble/sstable/rowblk - rowblk_writer.go (source / functions) Hit Total Coverage
Test: 2024-08-31 08:15Z 2801a3ea - tests only.lcov Lines: 172 177 97.2 %
Date: 2024-08-31 08:16:19 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2018 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : // Package rowblk defines facilities for row-oriented sstable blocks.
       6             : package rowblk
       7             : 
       8             : import (
       9             :         "encoding/binary"
      10             :         "unsafe"
      11             : 
      12             :         "github.com/cockroachdb/errors"
      13             :         "github.com/cockroachdb/pebble/internal/base"
      14             :         "github.com/cockroachdb/pebble/sstable/block"
      15             : )
      16             : 
      17             : const (
      18             :         // MaximumSize is an extremely generous maximum block size of 256MiB. We
      19             :         // explicitly place this limit to reserve a few bits in the restart for internal
      20             :         // use.
      21             :         MaximumSize = 1 << 28
      22             :         // EmptySize holds the size of an empty block. Every block ends in a uint32
      23             :         // trailer encoding the number of restart points within the block.
      24             :         EmptySize = 4
      25             : )
      26             : 
      27             : const (
      28             :         // TrailerObsoleteBit is a bit within the internal key trailer that's used
      29             :         // by the row-oriented block format to signify when a key is obsolete. It's
      30             :         // internal to the row-oriented block format, set when writing a block and
      31             :         // unset by blockIter, so no code outside block writing/reading code ever
      32             :         // sees it.
      33             :         //
      34             :         // TODO(jackson): Unexport once the blockIter is also in this package.
      35             :         TrailerObsoleteBit = base.InternalKeyTrailer(base.InternalKeyKindSSTableInternalObsoleteBit)
      36             :         // TrailerObsoleteMask defines a mask for the obsolete bit in the internal
      37             :         // key trailer.
      38             :         TrailerObsoleteMask = (base.InternalKeyTrailer(base.SeqNumMax) << 8) | base.InternalKeyTrailer(base.InternalKeyKindSSTableInternalObsoleteMask)
      39             : )
      40             : 
      41             : // Writer buffers and serializes key/value pairs into a row-oriented block.
      42             : type Writer struct {
      43             :         // RestartInterval configures the interval at which the writer will write a
      44             :         // full key without prefix compression, and encode a corresponding restart
      45             :         // point.
      46             :         RestartInterval int
      47             :         nEntries        int
      48             :         nextRestart     int
      49             :         buf             []byte
      50             :         // For datablocks in TableFormatPebblev3, we steal the most significant bit
      51             :         // in restarts for encoding setHasSameKeyPrefixSinceLastRestart. This leaves
      52             :         // us with 31 bits, which is more than enough (no one needs > 2GB blocks).
      53             :         // Typically, restarts occur every 16 keys, and by storing this bit with the
      54             :         // restart, we can optimize for the case where a user wants to skip to the
      55             :         // next prefix which happens to be in the same data block, but is > 16 keys
      56             :         // away. We have seen production situations with 100+ versions per MVCC key
      57             :         // (which share the same prefix). Additionally, for such writers, the prefix
      58             :         // compression of the key, that shares the key with the preceding key, is
      59             :         // limited to the prefix part of the preceding key -- this ensures that when
      60             :         // doing NPrefix (see blockIter) we don't need to assemble the full key
      61             :         // for each step since by limiting the length of the shared key we are
      62             :         // ensuring that any of the keys with the same prefix can be used to
      63             :         // assemble the full key when the prefix does change.
      64             :         restarts []uint32
      65             :         // Do not read curKey directly from outside blockWriter since it can have
      66             :         // the InternalKeyKindSSTableInternalObsoleteBit set. Use getCurKey() or
      67             :         // getCurUserKey() instead.
      68             :         curKey []byte
      69             :         // curValue excludes the optional prefix provided to
      70             :         // storeWithOptionalValuePrefix.
      71             :         curValue []byte
      72             :         prevKey  []byte
      73             :         tmp      [4]byte
      74             :         // We don't know the state of the sets that were at the end of the previous
      75             :         // block, so this is initially 0. It may be true for the second and later
      76             :         // restarts in a block. Not having inter-block information is fine since we
      77             :         // will optimize by stepping through restarts only within the same block.
      78             :         // Note that the first restart is the first key in the block.
      79             :         setHasSameKeyPrefixSinceLastRestart bool
      80             : }
      81             : 
      82             : // Reset resets the block writer to empty, preserving buffers for reuse.
      83           1 : func (w *Writer) Reset() {
      84           1 :         *w = Writer{
      85           1 :                 buf:      w.buf[:0],
      86           1 :                 restarts: w.restarts[:0],
      87           1 :                 curKey:   w.curKey[:0],
      88           1 :                 curValue: w.curValue[:0],
      89           1 :                 prevKey:  w.prevKey[:0],
      90           1 :         }
      91           1 : }
      92             : 
      93             : const setHasSameKeyPrefixRestartMask uint32 = 1 << 31
      94             : 
      95             : // EntryCount returns the count of entries written to the writer.
      96           1 : func (w *Writer) EntryCount() int {
      97           1 :         return w.nEntries
      98           1 : }
      99             : 
     100             : // CurKey returns the most recently written key.
     101           1 : func (w *Writer) CurKey() base.InternalKey {
     102           1 :         k := base.DecodeInternalKey(w.curKey)
     103           1 :         k.Trailer = k.Trailer & TrailerObsoleteMask
     104           1 :         return k
     105           1 : }
     106             : 
     107             : // CurValue returns the most recently written value.
     108           1 : func (w *Writer) CurValue() []byte {
     109           1 :         return w.curValue
     110           1 : }
     111             : 
     112             : // CurUserKey returns the most recently written user key.
     113           1 : func (w *Writer) CurUserKey() []byte {
     114           1 :         n := len(w.curKey) - base.InternalTrailerLen
     115           1 :         if n < 0 {
     116           0 :                 panic(errors.AssertionFailedf("corrupt key in blockWriter buffer"))
     117             :         }
     118           1 :         return w.curKey[:n:n]
     119             : }
     120             : 
     121             : // If !addValuePrefix, the valuePrefix is ignored.
     122             : func (w *Writer) storeWithOptionalValuePrefix(
     123             :         keySize int,
     124             :         value []byte,
     125             :         maxSharedKeyLen int,
     126             :         addValuePrefix bool,
     127             :         valuePrefix block.ValuePrefix,
     128             :         setHasSameKeyPrefix bool,
     129           1 : ) {
     130           1 :         shared := 0
     131           1 :         if !setHasSameKeyPrefix {
     132           1 :                 w.setHasSameKeyPrefixSinceLastRestart = false
     133           1 :         }
     134           1 :         if w.nEntries == w.nextRestart {
     135           1 :                 w.nextRestart = w.nEntries + w.RestartInterval
     136           1 :                 restart := uint32(len(w.buf))
     137           1 :                 if w.setHasSameKeyPrefixSinceLastRestart {
     138           1 :                         restart = restart | setHasSameKeyPrefixRestartMask
     139           1 :                 }
     140           1 :                 w.setHasSameKeyPrefixSinceLastRestart = true
     141           1 :                 w.restarts = append(w.restarts, restart)
     142           1 :         } else {
     143           1 :                 // TODO(peter): Manually inlined version of base.SharedPrefixLen(). This
     144           1 :                 // is 3% faster on BenchmarkWriter on go1.16. Remove if future versions
     145           1 :                 // show this to not be a performance win. For now, functions that use of
     146           1 :                 // unsafe cannot be inlined.
     147           1 :                 n := maxSharedKeyLen
     148           1 :                 if n > len(w.prevKey) {
     149           1 :                         n = len(w.prevKey)
     150           1 :                 }
     151           1 :                 asUint64 := func(b []byte, i int) uint64 {
     152           1 :                         return binary.LittleEndian.Uint64(b[i:])
     153           1 :                 }
     154           1 :                 for shared < n-7 && asUint64(w.curKey, shared) == asUint64(w.prevKey, shared) {
     155           1 :                         shared += 8
     156           1 :                 }
     157           1 :                 for shared < n && w.curKey[shared] == w.prevKey[shared] {
     158           1 :                         shared++
     159           1 :                 }
     160             :         }
     161             : 
     162           1 :         lenValuePlusOptionalPrefix := len(value)
     163           1 :         if addValuePrefix {
     164           1 :                 lenValuePlusOptionalPrefix++
     165           1 :         }
     166           1 :         needed := 3*binary.MaxVarintLen32 + len(w.curKey[shared:]) + lenValuePlusOptionalPrefix
     167           1 :         n := len(w.buf)
     168           1 :         if cap(w.buf) < n+needed {
     169           1 :                 newCap := 2 * cap(w.buf)
     170           1 :                 if newCap == 0 {
     171           1 :                         newCap = 1024
     172           1 :                 }
     173           1 :                 for newCap < n+needed {
     174           1 :                         newCap *= 2
     175           1 :                 }
     176           1 :                 newBuf := make([]byte, n, newCap)
     177           1 :                 copy(newBuf, w.buf)
     178           1 :                 w.buf = newBuf
     179             :         }
     180           1 :         w.buf = w.buf[:n+needed]
     181           1 : 
     182           1 :         // TODO(peter): Manually inlined versions of binary.PutUvarint(). This is 15%
     183           1 :         // faster on BenchmarkWriter on go1.13. Remove if go1.14 or future versions
     184           1 :         // show this to not be a performance win.
     185           1 :         {
     186           1 :                 x := uint32(shared)
     187           1 :                 for x >= 0x80 {
     188           0 :                         w.buf[n] = byte(x) | 0x80
     189           0 :                         x >>= 7
     190           0 :                         n++
     191           0 :                 }
     192           1 :                 w.buf[n] = byte(x)
     193           1 :                 n++
     194             :         }
     195             : 
     196           1 :         {
     197           1 :                 x := uint32(keySize - shared)
     198           1 :                 for x >= 0x80 {
     199           1 :                         w.buf[n] = byte(x) | 0x80
     200           1 :                         x >>= 7
     201           1 :                         n++
     202           1 :                 }
     203           1 :                 w.buf[n] = byte(x)
     204           1 :                 n++
     205             :         }
     206             : 
     207           1 :         {
     208           1 :                 x := uint32(lenValuePlusOptionalPrefix)
     209           1 :                 for x >= 0x80 {
     210           1 :                         w.buf[n] = byte(x) | 0x80
     211           1 :                         x >>= 7
     212           1 :                         n++
     213           1 :                 }
     214           1 :                 w.buf[n] = byte(x)
     215           1 :                 n++
     216             :         }
     217             : 
     218           1 :         n += copy(w.buf[n:], w.curKey[shared:])
     219           1 :         if addValuePrefix {
     220           1 :                 w.buf[n : n+1][0] = byte(valuePrefix)
     221           1 :                 n++
     222           1 :         }
     223           1 :         n += copy(w.buf[n:], value)
     224           1 :         w.buf = w.buf[:n]
     225           1 : 
     226           1 :         w.curValue = w.buf[n-len(value):]
     227           1 : 
     228           1 :         w.nEntries++
     229             : }
     230             : 
     231             : // Add adds a key value pair to the block without a value prefix.
     232           1 : func (w *Writer) Add(key base.InternalKey, value []byte) {
     233           1 :         w.AddWithOptionalValuePrefix(
     234           1 :                 key, false, value, len(key.UserKey), false, 0, false)
     235           1 : }
     236             : 
     237             : // AddWithOptionalValuePrefix adds a key value pair to the block, optionally
     238             : // including a value prefix.
     239             : //
     240             : // Callers that always set addValuePrefix to false should use add() instead.
     241             : //
     242             : // isObsolete indicates whether this key-value pair is obsolete in this
     243             : // sstable (only applicable when writing data blocks) -- see the comment in
     244             : // table.go and the longer one in format.go. addValuePrefix adds a 1 byte
     245             : // prefix to the value, specified in valuePrefix -- this is used for data
     246             : // blocks in TableFormatPebblev3 onwards for SETs (see the comment in
     247             : // format.go, with more details in value_block.go). setHasSameKeyPrefix is
     248             : // also used in TableFormatPebblev3 onwards for SETs.
     249             : func (w *Writer) AddWithOptionalValuePrefix(
     250             :         key base.InternalKey,
     251             :         isObsolete bool,
     252             :         value []byte,
     253             :         maxSharedKeyLen int,
     254             :         addValuePrefix bool,
     255             :         valuePrefix block.ValuePrefix,
     256             :         setHasSameKeyPrefix bool,
     257           1 : ) {
     258           1 :         w.curKey, w.prevKey = w.prevKey, w.curKey
     259           1 : 
     260           1 :         size := key.Size()
     261           1 :         if cap(w.curKey) < size {
     262           1 :                 w.curKey = make([]byte, 0, size*2)
     263           1 :         }
     264           1 :         w.curKey = w.curKey[:size]
     265           1 :         if isObsolete {
     266           1 :                 key.Trailer = key.Trailer | TrailerObsoleteBit
     267           1 :         }
     268           1 :         key.Encode(w.curKey)
     269           1 : 
     270           1 :         w.storeWithOptionalValuePrefix(
     271           1 :                 size, value, maxSharedKeyLen, addValuePrefix, valuePrefix, setHasSameKeyPrefix)
     272             : }
     273             : 
     274             : // Finish finalizes the block, serializes it and returns the serialized data.
     275           1 : func (w *Writer) Finish() []byte {
     276           1 :         // Write the restart points to the buffer.
     277           1 :         if w.nEntries == 0 {
     278           1 :                 // Every block must have at least one restart point.
     279           1 :                 if cap(w.restarts) > 0 {
     280           1 :                         w.restarts = w.restarts[:1]
     281           1 :                         w.restarts[0] = 0
     282           1 :                 } else {
     283           1 :                         w.restarts = append(w.restarts, 0)
     284           1 :                 }
     285             :         }
     286           1 :         tmp4 := w.tmp[:4]
     287           1 :         for _, x := range w.restarts {
     288           1 :                 binary.LittleEndian.PutUint32(tmp4, x)
     289           1 :                 w.buf = append(w.buf, tmp4...)
     290           1 :         }
     291           1 :         binary.LittleEndian.PutUint32(tmp4, uint32(len(w.restarts)))
     292           1 :         w.buf = append(w.buf, tmp4...)
     293           1 :         result := w.buf
     294           1 : 
     295           1 :         // Reset the block state.
     296           1 :         w.nEntries = 0
     297           1 :         w.nextRestart = 0
     298           1 :         w.buf = w.buf[:0]
     299           1 :         w.restarts = w.restarts[:0]
     300           1 :         return result
     301             : }
     302             : 
     303             : // EstimatedSize returns the estimated size of the block in bytes.
     304           1 : func (w *Writer) EstimatedSize() int {
     305           1 :         return len(w.buf) + 4*len(w.restarts) + EmptySize
     306           1 : }
     307             : 
     308             : // AddRaw adds a key value pair to the block.
     309           1 : func (w *Writer) AddRaw(key, value []byte) {
     310           1 :         w.curKey, w.prevKey = w.prevKey, w.curKey
     311           1 : 
     312           1 :         size := len(key)
     313           1 :         if cap(w.curKey) < size {
     314           1 :                 w.curKey = make([]byte, 0, size*2)
     315           1 :         }
     316           1 :         w.curKey = w.curKey[:size]
     317           1 :         copy(w.curKey, key)
     318           1 :         w.storeWithOptionalValuePrefix(
     319           1 :                 size, value, len(key), false, 0, false)
     320             : }
     321             : 
     322             : // AddRawString is AddRaw but with a string key.
     323           1 : func (w *Writer) AddRawString(key string, value []byte) {
     324           1 :         w.AddRaw(unsafe.Slice(unsafe.StringData(key), len(key)), value)
     325           1 : }

Generated by: LCOV version 1.14