LCOV - code coverage report
Current view: top level - pebble - mem_table.go (source / functions) Coverage Total Hit
Test: 2025-07-02 08:19Z a2947b4a - meta test only.lcov Lines: 84.1 % 227 191
Test Date: 2025-07-02 08:20:24 Functions: - 0 0

            Line data    Source code
       1              : // Copyright 2011 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2              : // of this source code is governed by a BSD-style license that can be found in
       3              : // the LICENSE file.
       4              : 
       5              : package pebble
       6              : 
       7              : import (
       8              :         "bytes"
       9              :         "fmt"
      10              :         "os"
      11              :         "sync"
      12              :         "sync/atomic"
      13              : 
      14              :         "github.com/cockroachdb/errors"
      15              :         "github.com/cockroachdb/pebble/internal/arenaskl"
      16              :         "github.com/cockroachdb/pebble/internal/base"
      17              :         "github.com/cockroachdb/pebble/internal/keyspan"
      18              :         "github.com/cockroachdb/pebble/internal/manual"
      19              :         "github.com/cockroachdb/pebble/internal/rangedel"
      20              :         "github.com/cockroachdb/pebble/internal/rangekey"
      21              : )
      22              : 
      23            1 : func memTableEntrySize(keyBytes, valueBytes int) uint64 {
      24            1 :         return arenaskl.MaxNodeSize(uint32(keyBytes)+8, uint32(valueBytes))
      25            1 : }
      26              : 
      27              : // memTableEmptySize is the amount of allocated space in the arena when the
      28              : // memtable is empty.
      29            1 : var memTableEmptySize = func() uint32 {
      30            1 :         var pointSkl arenaskl.Skiplist
      31            1 :         var rangeDelSkl arenaskl.Skiplist
      32            1 :         var rangeKeySkl arenaskl.Skiplist
      33            1 :         arena := arenaskl.NewArena(make([]byte, 16<<10 /* 16 KB */))
      34            1 :         pointSkl.Reset(arena, bytes.Compare)
      35            1 :         rangeDelSkl.Reset(arena, bytes.Compare)
      36            1 :         rangeKeySkl.Reset(arena, bytes.Compare)
      37            1 :         return arena.Size()
      38            1 : }()
      39              : 
      40              : // A memTable implements an in-memory layer of the LSM. A memTable is mutable,
      41              : // but append-only. Records are added, but never removed. Deletion is supported
      42              : // via tombstones, but it is up to higher level code (see Iterator) to support
      43              : // processing those tombstones.
      44              : //
      45              : // A memTable is implemented on top of a lock-free arena-backed skiplist. An
      46              : // arena is a fixed size contiguous chunk of memory (see
      47              : // Options.MemTableSize). A memTable's memory consumption is thus fixed at the
      48              : // time of creation (with the exception of the cached fragmented range
      49              : // tombstones). The arena-backed skiplist provides both forward and reverse
      50              : // links which makes forward and reverse iteration the same speed.
      51              : //
      52              : // A batch is "applied" to a memTable in a two step process: prepare(batch) ->
      53              : // apply(batch). memTable.prepare() is not thread-safe and must be called with
      54              : // external synchronization. Preparation reserves space in the memTable for the
      55              : // batch. Note that we pessimistically compute how much space a batch will
      56              : // consume in the memTable (see memTableEntrySize and
      57              : // Batch.memTableSize). Preparation is an O(1) operation. Applying a batch to
      58              : // the memTable can be performed concurrently with other apply
      59              : // operations. Applying a batch is an O(n logm) operation where N is the number
      60              : // of records in the batch and M is the number of records in the memtable. The
      61              : // commitPipeline serializes batch preparation, and allows batch application to
      62              : // proceed concurrently.
      63              : //
      64              : // It is safe to call get, apply, newIter, and newRangeDelIter concurrently.
      65              : type memTable struct {
      66              :         cmp         Compare
      67              :         formatKey   base.FormatKey
      68              :         equal       Equal
      69              :         arenaBuf    manual.Buf
      70              :         skl         arenaskl.Skiplist
      71              :         rangeDelSkl arenaskl.Skiplist
      72              :         rangeKeySkl arenaskl.Skiplist
      73              :         // reserved tracks the amount of space used by the memtable, both by actual
      74              :         // data stored in the memtable as well as inflight batch commit
      75              :         // operations. This value is incremented pessimistically by prepare() in
      76              :         // order to account for the space needed by a batch.
      77              :         reserved uint32
      78              :         // writerRefs tracks the write references on the memtable. The two sources of
      79              :         // writer references are the memtable being on DB.mu.mem.queue and from
      80              :         // inflight mutations that have reserved space in the memtable but not yet
      81              :         // applied. The memtable cannot be flushed to disk until the writer refs
      82              :         // drops to zero.
      83              :         writerRefs atomic.Int32
      84              :         tombstones keySpanCache
      85              :         rangeKeys  keySpanCache
      86              :         // The current logSeqNum at the time the memtable was created. This is
      87              :         // guaranteed to be less than or equal to any seqnum stored in the memtable.
      88              :         logSeqNum                    base.SeqNum
      89              :         releaseAccountingReservation func()
      90              : }
      91              : 
      92            1 : func (m *memTable) free() {
      93            1 :         if m != nil {
      94            1 :                 m.releaseAccountingReservation()
      95            1 :                 manual.Free(manual.MemTable, m.arenaBuf)
      96            1 :                 m.arenaBuf = manual.Buf{}
      97            1 :         }
      98              : }
      99              : 
     100              : // memTableOptions holds configuration used when creating a memTable. All of
     101              : // the fields are optional and will be filled with defaults if not specified
     102              : // which is used by tests.
     103              : type memTableOptions struct {
     104              :         *Options
     105              :         arenaBuf                     manual.Buf
     106              :         size                         int
     107              :         logSeqNum                    base.SeqNum
     108              :         releaseAccountingReservation func()
     109              : }
     110              : 
     111            1 : func checkMemTable(obj interface{}) {
     112            1 :         m := obj.(*memTable)
     113            1 :         if m.arenaBuf.Data() != nil {
     114            0 :                 fmt.Fprintf(os.Stderr, "%v: memTable buffer was not freed\n", m.arenaBuf)
     115            0 :                 os.Exit(1)
     116            0 :         }
     117              : }
     118              : 
     119              : // newMemTable returns a new MemTable of the specified size. If size is zero,
     120              : // Options.MemTableSize is used instead.
     121            0 : func newMemTable(opts memTableOptions) *memTable {
     122            0 :         if opts.Options == nil {
     123            0 :                 opts.Options = &Options{}
     124            0 :         }
     125            0 :         opts.Options.EnsureDefaults()
     126            0 :         m := new(memTable)
     127            0 :         m.init(opts)
     128            0 :         return m
     129              : }
     130              : 
     131            1 : func (m *memTable) init(opts memTableOptions) {
     132            1 :         if opts.size == 0 {
     133            1 :                 opts.size = int(opts.MemTableSize)
     134            1 :         }
     135            1 :         *m = memTable{
     136            1 :                 cmp:                          opts.Comparer.Compare,
     137            1 :                 formatKey:                    opts.Comparer.FormatKey,
     138            1 :                 equal:                        opts.Comparer.Equal,
     139            1 :                 arenaBuf:                     opts.arenaBuf,
     140            1 :                 logSeqNum:                    opts.logSeqNum,
     141            1 :                 releaseAccountingReservation: opts.releaseAccountingReservation,
     142            1 :         }
     143            1 :         m.writerRefs.Store(1)
     144            1 :         m.tombstones = keySpanCache{
     145            1 :                 cmp:           m.cmp,
     146            1 :                 formatKey:     m.formatKey,
     147            1 :                 skl:           &m.rangeDelSkl,
     148            1 :                 constructSpan: rangeDelConstructSpan,
     149            1 :         }
     150            1 :         m.rangeKeys = keySpanCache{
     151            1 :                 cmp:           m.cmp,
     152            1 :                 formatKey:     m.formatKey,
     153            1 :                 skl:           &m.rangeKeySkl,
     154            1 :                 constructSpan: rangekey.Decode,
     155            1 :         }
     156            1 : 
     157            1 :         if m.arenaBuf.Data() == nil {
     158            0 :                 m.arenaBuf = manual.New(manual.MemTable, uintptr(opts.size))
     159            0 :         }
     160              : 
     161            1 :         arena := arenaskl.NewArena(m.arenaBuf.Slice())
     162            1 :         m.skl.Reset(arena, m.cmp)
     163            1 :         m.rangeDelSkl.Reset(arena, m.cmp)
     164            1 :         m.rangeKeySkl.Reset(arena, m.cmp)
     165            1 :         m.reserved = arena.Size()
     166              : }
     167              : 
     168            1 : func (m *memTable) writerRef() {
     169            1 :         switch v := m.writerRefs.Add(1); {
     170            0 :         case v <= 1:
     171            0 :                 panic(fmt.Sprintf("pebble: inconsistent reference count: %d", v))
     172              :         }
     173              : }
     174              : 
     175              : // writerUnref drops a ref on the memtable. Returns true if this was the last ref.
     176            1 : func (m *memTable) writerUnref() (wasLastRef bool) {
     177            1 :         switch v := m.writerRefs.Add(-1); {
     178            0 :         case v < 0:
     179            0 :                 panic(fmt.Sprintf("pebble: inconsistent reference count: %d", v))
     180            1 :         case v == 0:
     181            1 :                 return true
     182            1 :         default:
     183            1 :                 return false
     184              :         }
     185              : }
     186              : 
     187              : // readyForFlush is part of the flushable interface.
     188            1 : func (m *memTable) readyForFlush() bool {
     189            1 :         return m.writerRefs.Load() == 0
     190            1 : }
     191              : 
     192              : // Prepare reserves space for the batch in the memtable and references the
     193              : // memtable preventing it from being flushed until the batch is applied. Note
     194              : // that prepare is not thread-safe, while apply is. The caller must call
     195              : // writerUnref() after the batch has been applied.
     196            1 : func (m *memTable) prepare(batch *Batch) error {
     197            1 :         avail := m.availBytes()
     198            1 :         if batch.memTableSize > uint64(avail) {
     199            1 :                 return arenaskl.ErrArenaFull
     200            1 :         }
     201            1 :         m.reserved += uint32(batch.memTableSize)
     202            1 : 
     203            1 :         m.writerRef()
     204            1 :         return nil
     205              : }
     206              : 
     207            1 : func (m *memTable) apply(batch *Batch, seqNum base.SeqNum) error {
     208            1 :         if seqNum < m.logSeqNum {
     209            0 :                 return base.CorruptionErrorf("pebble: batch seqnum %d is less than memtable creation seqnum %d",
     210            0 :                         errors.Safe(seqNum), errors.Safe(m.logSeqNum))
     211            0 :         }
     212              : 
     213            1 :         var ins arenaskl.Inserter
     214            1 :         var tombstoneCount, rangeKeyCount uint32
     215            1 :         startSeqNum := seqNum
     216            1 :         for r := batch.Reader(); ; seqNum++ {
     217            1 :                 kind, ukey, value, ok, err := r.Next()
     218            1 :                 if !ok {
     219            1 :                         if err != nil {
     220            0 :                                 return err
     221            0 :                         }
     222            1 :                         break
     223              :                 }
     224            1 :                 ikey := base.MakeInternalKey(ukey, seqNum, kind)
     225            1 :                 switch kind {
     226            1 :                 case InternalKeyKindRangeDelete:
     227            1 :                         err = m.rangeDelSkl.Add(ikey, value)
     228            1 :                         tombstoneCount++
     229            1 :                 case InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete:
     230            1 :                         err = m.rangeKeySkl.Add(ikey, value)
     231            1 :                         rangeKeyCount++
     232            1 :                 case InternalKeyKindLogData:
     233            1 :                         // Don't increment seqNum for LogData, since these are not applied
     234            1 :                         // to the memtable.
     235            1 :                         seqNum--
     236            0 :                 case InternalKeyKindIngestSST, InternalKeyKindExcise:
     237            0 :                         panic("pebble: cannot apply ingested sstable or excise kind keys to memtable")
     238            1 :                 default:
     239            1 :                         err = ins.Add(&m.skl, ikey, value)
     240              :                 }
     241            1 :                 if err != nil {
     242            0 :                         return err
     243            0 :                 }
     244              :         }
     245            1 :         if seqNum != startSeqNum+base.SeqNum(batch.Count()) {
     246            0 :                 return base.CorruptionErrorf("pebble: inconsistent batch count: %d vs %d",
     247            0 :                         errors.Safe(seqNum), errors.Safe(startSeqNum+base.SeqNum(batch.Count())))
     248            0 :         }
     249            1 :         if tombstoneCount != 0 {
     250            1 :                 m.tombstones.invalidate(tombstoneCount)
     251            1 :         }
     252            1 :         if rangeKeyCount != 0 {
     253            1 :                 m.rangeKeys.invalidate(rangeKeyCount)
     254            1 :         }
     255            1 :         return nil
     256              : }
     257              : 
     258              : // newIter is part of the flushable interface. It returns an iterator that is
     259              : // unpositioned (Iterator.Valid() will return false). The iterator can be
     260              : // positioned via a call to SeekGE, SeekLT, First or Last.
     261            1 : func (m *memTable) newIter(o *IterOptions) internalIterator {
     262            1 :         return m.skl.NewIter(o.GetLowerBound(), o.GetUpperBound())
     263            1 : }
     264              : 
     265              : // newFlushIter is part of the flushable interface.
     266            1 : func (m *memTable) newFlushIter(o *IterOptions) internalIterator {
     267            1 :         return m.skl.NewFlushIter()
     268            1 : }
     269              : 
     270              : // newRangeDelIter is part of the flushable interface.
     271            1 : func (m *memTable) newRangeDelIter(*IterOptions) keyspan.FragmentIterator {
     272            1 :         tombstones := m.tombstones.get()
     273            1 :         if tombstones == nil {
     274            1 :                 return nil
     275            1 :         }
     276            1 :         return keyspan.NewIter(m.cmp, tombstones)
     277              : }
     278              : 
     279              : // newRangeKeyIter is part of the flushable interface.
     280            1 : func (m *memTable) newRangeKeyIter(*IterOptions) keyspan.FragmentIterator {
     281            1 :         rangeKeys := m.rangeKeys.get()
     282            1 :         if rangeKeys == nil {
     283            1 :                 return nil
     284            1 :         }
     285            1 :         return keyspan.NewIter(m.cmp, rangeKeys)
     286              : }
     287              : 
     288              : // containsRangeKeys is part of the flushable interface.
     289            1 : func (m *memTable) containsRangeKeys() bool {
     290            1 :         return m.rangeKeys.count.Load() > 0
     291            1 : }
     292              : 
     293            1 : func (m *memTable) availBytes() uint32 {
     294            1 :         a := m.skl.Arena()
     295            1 :         if m.writerRefs.Load() == 1 {
     296            1 :                 // Note that one ref is maintained as long as the memtable is the
     297            1 :                 // current mutable memtable, so when evaluating whether the current
     298            1 :                 // mutable memtable has sufficient space for committing a batch, it is
     299            1 :                 // guaranteed that m.writerRefs() >= 1. This means a writerRefs() of 1
     300            1 :                 // indicates there are no other concurrent apply operations.
     301            1 :                 //
     302            1 :                 // If there are no other concurrent apply operations, we can update the
     303            1 :                 // reserved bytes setting to accurately reflect how many bytes of been
     304            1 :                 // allocated vs the over-estimation present in memTableEntrySize.
     305            1 :                 m.reserved = a.Size()
     306            1 :         }
     307            1 :         return a.Capacity() - m.reserved
     308              : }
     309              : 
     310              : // inuseBytes is part of the flushable interface.
     311            1 : func (m *memTable) inuseBytes() uint64 {
     312            1 :         return uint64(m.skl.Size() - memTableEmptySize)
     313            1 : }
     314              : 
     315              : // totalBytes is part of the flushable interface.
     316            1 : func (m *memTable) totalBytes() uint64 {
     317            1 :         return uint64(m.skl.Arena().Capacity())
     318            1 : }
     319              : 
     320              : // empty returns whether the MemTable has no key/value pairs.
     321            0 : func (m *memTable) empty() bool {
     322            0 :         return m.skl.Size() == memTableEmptySize
     323            0 : }
     324              : 
     325              : // computePossibleOverlaps is part of the flushable interface.
     326            1 : func (m *memTable) computePossibleOverlaps(fn func(bounded) shouldContinue, bounded ...bounded) {
     327            1 :         computePossibleOverlapsGenericImpl[*memTable](m, m.cmp, fn, bounded)
     328            1 : }
     329              : 
     330              : // A keySpanFrags holds a set of fragmented keyspan.Spans with a particular key
     331              : // kind at a particular moment for a memtable.
     332              : //
     333              : // When a new span of a particular kind is added to the memtable, it may overlap
     334              : // with other spans of the same kind. Instead of performing the fragmentation
     335              : // whenever an iterator requires it, fragments are cached within a keySpanCache
     336              : // type. The keySpanCache uses keySpanFrags to hold the cached fragmented spans.
     337              : //
     338              : // The count of keys (and keys of any given kind) in a memtable only
     339              : // monotonically increases. The count of key spans of a particular kind is used
     340              : // as a stand-in for a 'sequence number'. A keySpanFrags represents the
     341              : // fragmented state of the memtable's keys of a given kind at the moment while
     342              : // there existed `count` keys of that kind in the memtable.
     343              : //
     344              : // It's currently only used to contain fragmented range deletion tombstones.
     345              : type keySpanFrags struct {
     346              :         count uint32
     347              :         once  sync.Once
     348              :         spans []keyspan.Span
     349              : }
     350              : 
     351              : type constructSpan func(ik base.InternalKey, v []byte, keysDst []keyspan.Key) (keyspan.Span, error)
     352              : 
     353              : func rangeDelConstructSpan(
     354              :         ik base.InternalKey, v []byte, keysDst []keyspan.Key,
     355            1 : ) (keyspan.Span, error) {
     356            1 :         return rangedel.Decode(ik, v, keysDst), nil
     357            1 : }
     358              : 
     359              : // get retrieves the fragmented spans, populating them if necessary. Note that
     360              : // the populated span fragments may be built from more than f.count memTable
     361              : // spans, but that is ok for correctness. All we're requiring is that the
     362              : // memTable contains at least f.count keys of the configured kind. This
     363              : // situation can occur if there are multiple concurrent additions of the key
     364              : // kind and a concurrent reader. The reader can load a keySpanFrags and populate
     365              : // it even though is has been invalidated (i.e. replaced with a newer
     366              : // keySpanFrags).
     367              : func (f *keySpanFrags) get(
     368              :         skl *arenaskl.Skiplist, cmp Compare, formatKey base.FormatKey, constructSpan constructSpan,
     369            1 : ) []keyspan.Span {
     370            1 :         f.once.Do(func() {
     371            1 :                 frag := &keyspan.Fragmenter{
     372            1 :                         Cmp:    cmp,
     373            1 :                         Format: formatKey,
     374            1 :                         Emit: func(fragmented keyspan.Span) {
     375            1 :                                 f.spans = append(f.spans, fragmented)
     376            1 :                         },
     377              :                 }
     378            1 :                 it := skl.NewIter(nil, nil)
     379            1 :                 var keysDst []keyspan.Key
     380            1 :                 for kv := it.First(); kv != nil; kv = it.Next() {
     381            1 :                         s, err := constructSpan(kv.K, kv.InPlaceValue(), keysDst)
     382            1 :                         if err != nil {
     383            0 :                                 panic(err)
     384              :                         }
     385            1 :                         frag.Add(s)
     386            1 :                         keysDst = s.Keys[len(s.Keys):]
     387              :                 }
     388            1 :                 frag.Finish()
     389              :         })
     390            1 :         return f.spans
     391              : }
     392              : 
     393              : // A keySpanCache is used to cache a set of fragmented spans. The cache is
     394              : // invalidated whenever a key of the same kind is added to a memTable, and
     395              : // populated when empty when a span iterator of that key kind is created.
     396              : type keySpanCache struct {
     397              :         count         atomic.Uint32
     398              :         frags         atomic.Pointer[keySpanFrags]
     399              :         cmp           Compare
     400              :         formatKey     base.FormatKey
     401              :         constructSpan constructSpan
     402              :         skl           *arenaskl.Skiplist
     403              : }
     404              : 
     405              : // Invalidate the current set of cached spans, indicating the number of
     406              : // spans that were added.
     407            1 : func (c *keySpanCache) invalidate(count uint32) {
     408            1 :         newCount := c.count.Add(count)
     409            1 :         var frags *keySpanFrags
     410            1 : 
     411            1 :         for {
     412            1 :                 oldFrags := c.frags.Load()
     413            1 :                 if oldFrags != nil && oldFrags.count >= newCount {
     414            0 :                         // Someone else invalidated the cache before us and their invalidation
     415            0 :                         // subsumes ours.
     416            0 :                         break
     417              :                 }
     418            1 :                 if frags == nil {
     419            1 :                         frags = &keySpanFrags{count: newCount}
     420            1 :                 }
     421            1 :                 if c.frags.CompareAndSwap(oldFrags, frags) {
     422            1 :                         // We successfully invalidated the cache.
     423            1 :                         break
     424              :                 }
     425              :                 // Someone else invalidated the cache. Loop and try again.
     426              :         }
     427              : }
     428              : 
     429            1 : func (c *keySpanCache) get() []keyspan.Span {
     430            1 :         frags := c.frags.Load()
     431            1 :         if frags == nil {
     432            1 :                 return nil
     433            1 :         }
     434            1 :         return frags.get(c.skl, c.cmp, c.formatKey, c.constructSpan)
     435              : }
        

Generated by: LCOV version 2.0-1