LCOV - code coverage report
Current view: top level - pebble - mem_table.go (source / functions) Hit Total Coverage
Test: 2024-02-14 08:15Z 11b5d32f - meta test only.lcov Lines: 185 219 84.5 %
Date: 2024-02-14 08:16:41 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2011 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package pebble
       6             : 
       7             : import (
       8             :         "bytes"
       9             :         "fmt"
      10             :         "os"
      11             :         "sync"
      12             :         "sync/atomic"
      13             : 
      14             :         "github.com/cockroachdb/errors"
      15             :         "github.com/cockroachdb/pebble/internal/arenaskl"
      16             :         "github.com/cockroachdb/pebble/internal/base"
      17             :         "github.com/cockroachdb/pebble/internal/keyspan"
      18             :         "github.com/cockroachdb/pebble/internal/manual"
      19             :         "github.com/cockroachdb/pebble/internal/rangedel"
      20             :         "github.com/cockroachdb/pebble/internal/rangekey"
      21             : )
      22             : 
      23           1 : func memTableEntrySize(keyBytes, valueBytes int) uint64 {
      24           1 :         return arenaskl.MaxNodeSize(uint32(keyBytes)+8, uint32(valueBytes))
      25           1 : }
      26             : 
      27             : // memTableEmptySize is the amount of allocated space in the arena when the
      28             : // memtable is empty.
      29           1 : var memTableEmptySize = func() uint32 {
      30           1 :         var pointSkl arenaskl.Skiplist
      31           1 :         var rangeDelSkl arenaskl.Skiplist
      32           1 :         var rangeKeySkl arenaskl.Skiplist
      33           1 :         arena := arenaskl.NewArena(make([]byte, 16<<10 /* 16 KB */))
      34           1 :         pointSkl.Reset(arena, bytes.Compare)
      35           1 :         rangeDelSkl.Reset(arena, bytes.Compare)
      36           1 :         rangeKeySkl.Reset(arena, bytes.Compare)
      37           1 :         return arena.Size()
      38           1 : }()
      39             : 
      40             : // A memTable implements an in-memory layer of the LSM. A memTable is mutable,
      41             : // but append-only. Records are added, but never removed. Deletion is supported
      42             : // via tombstones, but it is up to higher level code (see Iterator) to support
      43             : // processing those tombstones.
      44             : //
      45             : // A memTable is implemented on top of a lock-free arena-backed skiplist. An
      46             : // arena is a fixed size contiguous chunk of memory (see
      47             : // Options.MemTableSize). A memTable's memory consumption is thus fixed at the
      48             : // time of creation (with the exception of the cached fragmented range
      49             : // tombstones). The arena-backed skiplist provides both forward and reverse
      50             : // links which makes forward and reverse iteration the same speed.
      51             : //
      52             : // A batch is "applied" to a memTable in a two step process: prepare(batch) ->
      53             : // apply(batch). memTable.prepare() is not thread-safe and must be called with
      54             : // external synchronization. Preparation reserves space in the memTable for the
      55             : // batch. Note that we pessimistically compute how much space a batch will
      56             : // consume in the memTable (see memTableEntrySize and
      57             : // Batch.memTableSize). Preparation is an O(1) operation. Applying a batch to
      58             : // the memTable can be performed concurrently with other apply
      59             : // operations. Applying a batch is an O(n logm) operation where N is the number
      60             : // of records in the batch and M is the number of records in the memtable. The
      61             : // commitPipeline serializes batch preparation, and allows batch application to
      62             : // proceed concurrently.
      63             : //
      64             : // It is safe to call get, apply, newIter, and newRangeDelIter concurrently.
      65             : type memTable struct {
      66             :         cmp         Compare
      67             :         formatKey   base.FormatKey
      68             :         equal       Equal
      69             :         arenaBuf    []byte
      70             :         skl         arenaskl.Skiplist
      71             :         rangeDelSkl arenaskl.Skiplist
      72             :         rangeKeySkl arenaskl.Skiplist
      73             :         // reserved tracks the amount of space used by the memtable, both by actual
      74             :         // data stored in the memtable as well as inflight batch commit
      75             :         // operations. This value is incremented pessimistically by prepare() in
      76             :         // order to account for the space needed by a batch.
      77             :         reserved uint32
      78             :         // writerRefs tracks the write references on the memtable. The two sources of
      79             :         // writer references are the memtable being on DB.mu.mem.queue and from
      80             :         // inflight mutations that have reserved space in the memtable but not yet
      81             :         // applied. The memtable cannot be flushed to disk until the writer refs
      82             :         // drops to zero.
      83             :         writerRefs atomic.Int32
      84             :         tombstones keySpanCache
      85             :         rangeKeys  keySpanCache
      86             :         // The current logSeqNum at the time the memtable was created. This is
      87             :         // guaranteed to be less than or equal to any seqnum stored in the memtable.
      88             :         logSeqNum                    uint64
      89             :         releaseAccountingReservation func()
      90             : }
      91             : 
      92           1 : func (m *memTable) free() {
      93           1 :         if m != nil {
      94           1 :                 m.releaseAccountingReservation()
      95           1 :                 manual.Free(m.arenaBuf)
      96           1 :                 m.arenaBuf = nil
      97           1 :         }
      98             : }
      99             : 
     100             : // memTableOptions holds configuration used when creating a memTable. All of
     101             : // the fields are optional and will be filled with defaults if not specified
     102             : // which is used by tests.
     103             : type memTableOptions struct {
     104             :         *Options
     105             :         arenaBuf                     []byte
     106             :         size                         int
     107             :         logSeqNum                    uint64
     108             :         releaseAccountingReservation func()
     109             : }
     110             : 
     111           1 : func checkMemTable(obj interface{}) {
     112           1 :         m := obj.(*memTable)
     113           1 :         if m.arenaBuf != nil {
     114           0 :                 fmt.Fprintf(os.Stderr, "%p: memTable buffer was not freed\n", m.arenaBuf)
     115           0 :                 os.Exit(1)
     116           0 :         }
     117             : }
     118             : 
     119             : // newMemTable returns a new MemTable of the specified size. If size is zero,
     120             : // Options.MemTableSize is used instead.
     121           0 : func newMemTable(opts memTableOptions) *memTable {
     122           0 :         opts.Options = opts.Options.EnsureDefaults()
     123           0 :         m := new(memTable)
     124           0 :         m.init(opts)
     125           0 :         return m
     126           0 : }
     127             : 
     128           1 : func (m *memTable) init(opts memTableOptions) {
     129           1 :         if opts.size == 0 {
     130           1 :                 opts.size = int(opts.MemTableSize)
     131           1 :         }
     132           1 :         *m = memTable{
     133           1 :                 cmp:                          opts.Comparer.Compare,
     134           1 :                 formatKey:                    opts.Comparer.FormatKey,
     135           1 :                 equal:                        opts.Comparer.Equal,
     136           1 :                 arenaBuf:                     opts.arenaBuf,
     137           1 :                 logSeqNum:                    opts.logSeqNum,
     138           1 :                 releaseAccountingReservation: opts.releaseAccountingReservation,
     139           1 :         }
     140           1 :         m.writerRefs.Store(1)
     141           1 :         m.tombstones = keySpanCache{
     142           1 :                 cmp:           m.cmp,
     143           1 :                 formatKey:     m.formatKey,
     144           1 :                 skl:           &m.rangeDelSkl,
     145           1 :                 constructSpan: rangeDelConstructSpan,
     146           1 :         }
     147           1 :         m.rangeKeys = keySpanCache{
     148           1 :                 cmp:           m.cmp,
     149           1 :                 formatKey:     m.formatKey,
     150           1 :                 skl:           &m.rangeKeySkl,
     151           1 :                 constructSpan: rangekey.Decode,
     152           1 :         }
     153           1 : 
     154           1 :         if m.arenaBuf == nil {
     155           0 :                 m.arenaBuf = make([]byte, opts.size)
     156           0 :         }
     157             : 
     158           1 :         arena := arenaskl.NewArena(m.arenaBuf)
     159           1 :         m.skl.Reset(arena, m.cmp)
     160           1 :         m.rangeDelSkl.Reset(arena, m.cmp)
     161           1 :         m.rangeKeySkl.Reset(arena, m.cmp)
     162           1 :         m.reserved = arena.Size()
     163             : }
     164             : 
     165           1 : func (m *memTable) writerRef() {
     166           1 :         switch v := m.writerRefs.Add(1); {
     167           0 :         case v <= 1:
     168           0 :                 panic(fmt.Sprintf("pebble: inconsistent reference count: %d", v))
     169             :         }
     170             : }
     171             : 
     172             : // writerUnref drops a ref on the memtable. Returns true if this was the last ref.
     173           1 : func (m *memTable) writerUnref() (wasLastRef bool) {
     174           1 :         switch v := m.writerRefs.Add(-1); {
     175           0 :         case v < 0:
     176           0 :                 panic(fmt.Sprintf("pebble: inconsistent reference count: %d", v))
     177           1 :         case v == 0:
     178           1 :                 return true
     179           1 :         default:
     180           1 :                 return false
     181             :         }
     182             : }
     183             : 
     184             : // readyForFlush is part of the flushable interface.
     185           1 : func (m *memTable) readyForFlush() bool {
     186           1 :         return m.writerRefs.Load() == 0
     187           1 : }
     188             : 
     189             : // Prepare reserves space for the batch in the memtable and references the
     190             : // memtable preventing it from being flushed until the batch is applied. Note
     191             : // that prepare is not thread-safe, while apply is. The caller must call
     192             : // writerUnref() after the batch has been applied.
     193           1 : func (m *memTable) prepare(batch *Batch) error {
     194           1 :         avail := m.availBytes()
     195           1 :         if batch.memTableSize > uint64(avail) {
     196           1 :                 return arenaskl.ErrArenaFull
     197           1 :         }
     198           1 :         m.reserved += uint32(batch.memTableSize)
     199           1 : 
     200           1 :         m.writerRef()
     201           1 :         return nil
     202             : }
     203             : 
     204           1 : func (m *memTable) apply(batch *Batch, seqNum uint64) error {
     205           1 :         if seqNum < m.logSeqNum {
     206           0 :                 return base.CorruptionErrorf("pebble: batch seqnum %d is less than memtable creation seqnum %d",
     207           0 :                         errors.Safe(seqNum), errors.Safe(m.logSeqNum))
     208           0 :         }
     209             : 
     210           1 :         var ins arenaskl.Inserter
     211           1 :         var tombstoneCount, rangeKeyCount uint32
     212           1 :         startSeqNum := seqNum
     213           1 :         for r := batch.Reader(); ; seqNum++ {
     214           1 :                 kind, ukey, value, ok, err := r.Next()
     215           1 :                 if !ok {
     216           1 :                         if err != nil {
     217           0 :                                 return err
     218           0 :                         }
     219           1 :                         break
     220             :                 }
     221           1 :                 ikey := base.MakeInternalKey(ukey, seqNum, kind)
     222           1 :                 switch kind {
     223           1 :                 case InternalKeyKindRangeDelete:
     224           1 :                         err = m.rangeDelSkl.Add(ikey, value)
     225           1 :                         tombstoneCount++
     226           1 :                 case InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete:
     227           1 :                         err = m.rangeKeySkl.Add(ikey, value)
     228           1 :                         rangeKeyCount++
     229           1 :                 case InternalKeyKindLogData:
     230           1 :                         // Don't increment seqNum for LogData, since these are not applied
     231           1 :                         // to the memtable.
     232           1 :                         seqNum--
     233           0 :                 case InternalKeyKindIngestSST:
     234           0 :                         panic("pebble: cannot apply ingested sstable key kind to memtable")
     235           1 :                 default:
     236           1 :                         err = ins.Add(&m.skl, ikey, value)
     237             :                 }
     238           1 :                 if err != nil {
     239           0 :                         return err
     240           0 :                 }
     241             :         }
     242           1 :         if seqNum != startSeqNum+uint64(batch.Count()) {
     243           0 :                 return base.CorruptionErrorf("pebble: inconsistent batch count: %d vs %d",
     244           0 :                         errors.Safe(seqNum), errors.Safe(startSeqNum+uint64(batch.Count())))
     245           0 :         }
     246           1 :         if tombstoneCount != 0 {
     247           1 :                 m.tombstones.invalidate(tombstoneCount)
     248           1 :         }
     249           1 :         if rangeKeyCount != 0 {
     250           1 :                 m.rangeKeys.invalidate(rangeKeyCount)
     251           1 :         }
     252           1 :         return nil
     253             : }
     254             : 
     255             : // newIter is part of the flushable interface. It returns an iterator that is
     256             : // unpositioned (Iterator.Valid() will return false). The iterator can be
     257             : // positioned via a call to SeekGE, SeekLT, First or Last.
     258           1 : func (m *memTable) newIter(o *IterOptions) internalIterator {
     259           1 :         return m.skl.NewIter(o.GetLowerBound(), o.GetUpperBound())
     260           1 : }
     261             : 
     262             : // newFlushIter is part of the flushable interface.
     263           1 : func (m *memTable) newFlushIter(o *IterOptions, bytesFlushed *uint64) internalIterator {
     264           1 :         return m.skl.NewFlushIter(bytesFlushed)
     265           1 : }
     266             : 
     267             : // newRangeDelIter is part of the flushable interface.
     268           1 : func (m *memTable) newRangeDelIter(*IterOptions) keyspan.FragmentIterator {
     269           1 :         tombstones := m.tombstones.get()
     270           1 :         if tombstones == nil {
     271           1 :                 return nil
     272           1 :         }
     273           1 :         return keyspan.NewIter(m.cmp, tombstones)
     274             : }
     275             : 
     276             : // newRangeKeyIter is part of the flushable interface.
     277           1 : func (m *memTable) newRangeKeyIter(*IterOptions) keyspan.FragmentIterator {
     278           1 :         rangeKeys := m.rangeKeys.get()
     279           1 :         if rangeKeys == nil {
     280           1 :                 return nil
     281           1 :         }
     282           1 :         return keyspan.NewIter(m.cmp, rangeKeys)
     283             : }
     284             : 
     285             : // containsRangeKeys is part of the flushable interface.
     286           1 : func (m *memTable) containsRangeKeys() bool {
     287           1 :         return m.rangeKeys.count.Load() > 0
     288           1 : }
     289             : 
     290           1 : func (m *memTable) availBytes() uint32 {
     291           1 :         a := m.skl.Arena()
     292           1 :         if m.writerRefs.Load() == 1 {
     293           1 :                 // If there are no other concurrent apply operations, we can update the
     294           1 :                 // reserved bytes setting to accurately reflect how many bytes of been
     295           1 :                 // allocated vs the over-estimation present in memTableEntrySize.
     296           1 :                 m.reserved = a.Size()
     297           1 :         }
     298           1 :         return a.Capacity() - m.reserved
     299             : }
     300             : 
     301             : // inuseBytes is part of the flushable interface.
     302           1 : func (m *memTable) inuseBytes() uint64 {
     303           1 :         return uint64(m.skl.Size() - memTableEmptySize)
     304           1 : }
     305             : 
     306             : // totalBytes is part of the flushable interface.
     307           1 : func (m *memTable) totalBytes() uint64 {
     308           1 :         return uint64(m.skl.Arena().Capacity())
     309           1 : }
     310             : 
     311             : // empty returns whether the MemTable has no key/value pairs.
     312           0 : func (m *memTable) empty() bool {
     313           0 :         return m.skl.Size() == memTableEmptySize
     314           0 : }
     315             : 
     316             : // computePossibleOverlaps is part of the flushable interface.
     317           1 : func (m *memTable) computePossibleOverlaps(fn func(bounded) shouldContinue, bounded ...bounded) {
     318           1 :         computePossibleOverlapsGenericImpl[*memTable](m, m.cmp, fn, bounded)
     319           1 : }
     320             : 
     321             : // A keySpanFrags holds a set of fragmented keyspan.Spans with a particular key
     322             : // kind at a particular moment for a memtable.
     323             : //
     324             : // When a new span of a particular kind is added to the memtable, it may overlap
     325             : // with other spans of the same kind. Instead of performing the fragmentation
     326             : // whenever an iterator requires it, fragments are cached within a keySpanCache
     327             : // type. The keySpanCache uses keySpanFrags to hold the cached fragmented spans.
     328             : //
     329             : // The count of keys (and keys of any given kind) in a memtable only
     330             : // monotonically increases. The count of key spans of a particular kind is used
     331             : // as a stand-in for a 'sequence number'. A keySpanFrags represents the
     332             : // fragmented state of the memtable's keys of a given kind at the moment while
     333             : // there existed `count` keys of that kind in the memtable.
     334             : //
     335             : // It's currently only used to contain fragmented range deletion tombstones.
     336             : type keySpanFrags struct {
     337             :         count uint32
     338             :         once  sync.Once
     339             :         spans []keyspan.Span
     340             : }
     341             : 
     342             : type constructSpan func(ik base.InternalKey, v []byte, keysDst []keyspan.Key) (keyspan.Span, error)
     343             : 
     344             : func rangeDelConstructSpan(
     345             :         ik base.InternalKey, v []byte, keysDst []keyspan.Key,
     346           1 : ) (keyspan.Span, error) {
     347           1 :         return rangedel.Decode(ik, v, keysDst), nil
     348           1 : }
     349             : 
     350             : // get retrieves the fragmented spans, populating them if necessary. Note that
     351             : // the populated span fragments may be built from more than f.count memTable
     352             : // spans, but that is ok for correctness. All we're requiring is that the
     353             : // memTable contains at least f.count keys of the configured kind. This
     354             : // situation can occur if there are multiple concurrent additions of the key
     355             : // kind and a concurrent reader. The reader can load a keySpanFrags and populate
     356             : // it even though is has been invalidated (i.e. replaced with a newer
     357             : // keySpanFrags).
     358             : func (f *keySpanFrags) get(
     359             :         skl *arenaskl.Skiplist, cmp Compare, formatKey base.FormatKey, constructSpan constructSpan,
     360           1 : ) []keyspan.Span {
     361           1 :         f.once.Do(func() {
     362           1 :                 frag := &keyspan.Fragmenter{
     363           1 :                         Cmp:    cmp,
     364           1 :                         Format: formatKey,
     365           1 :                         Emit: func(fragmented keyspan.Span) {
     366           1 :                                 f.spans = append(f.spans, fragmented)
     367           1 :                         },
     368             :                 }
     369           1 :                 it := skl.NewIter(nil, nil)
     370           1 :                 var keysDst []keyspan.Key
     371           1 :                 for key, val := it.First(); key != nil; key, val = it.Next() {
     372           1 :                         s, err := constructSpan(*key, val.InPlaceValue(), keysDst)
     373           1 :                         if err != nil {
     374           0 :                                 panic(err)
     375             :                         }
     376           1 :                         frag.Add(s)
     377           1 :                         keysDst = s.Keys[len(s.Keys):]
     378             :                 }
     379           1 :                 frag.Finish()
     380             :         })
     381           1 :         return f.spans
     382             : }
     383             : 
     384             : // A keySpanCache is used to cache a set of fragmented spans. The cache is
     385             : // invalidated whenever a key of the same kind is added to a memTable, and
     386             : // populated when empty when a span iterator of that key kind is created.
     387             : type keySpanCache struct {
     388             :         count         atomic.Uint32
     389             :         frags         atomic.Pointer[keySpanFrags]
     390             :         cmp           Compare
     391             :         formatKey     base.FormatKey
     392             :         constructSpan constructSpan
     393             :         skl           *arenaskl.Skiplist
     394             : }
     395             : 
     396             : // Invalidate the current set of cached spans, indicating the number of
     397             : // spans that were added.
     398           1 : func (c *keySpanCache) invalidate(count uint32) {
     399           1 :         newCount := c.count.Add(count)
     400           1 :         var frags *keySpanFrags
     401           1 : 
     402           1 :         for {
     403           1 :                 oldFrags := c.frags.Load()
     404           1 :                 if oldFrags != nil && oldFrags.count >= newCount {
     405           0 :                         // Someone else invalidated the cache before us and their invalidation
     406           0 :                         // subsumes ours.
     407           0 :                         break
     408             :                 }
     409           1 :                 if frags == nil {
     410           1 :                         frags = &keySpanFrags{count: newCount}
     411           1 :                 }
     412           1 :                 if c.frags.CompareAndSwap(oldFrags, frags) {
     413           1 :                         // We successfully invalidated the cache.
     414           1 :                         break
     415             :                 }
     416             :                 // Someone else invalidated the cache. Loop and try again.
     417             :         }
     418             : }
     419             : 
     420           1 : func (c *keySpanCache) get() []keyspan.Span {
     421           1 :         frags := c.frags.Load()
     422           1 :         if frags == nil {
     423           1 :                 return nil
     424           1 :         }
     425           1 :         return frags.get(c.skl, c.cmp, c.formatKey, c.constructSpan)
     426             : }

Generated by: LCOV version 1.14