LCOV - code coverage report
Current view: top level - pebble - mem_table.go (source / functions) Hit Total Coverage
Test: 2023-10-14 08:17Z bbbf3df1 - tests only.lcov Lines: 193 214 90.2 %
Date: 2023-10-14 08:17:44 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2011 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package pebble
       6             : 
       7             : import (
       8             :         "bytes"
       9             :         "fmt"
      10             :         "os"
      11             :         "sync"
      12             :         "sync/atomic"
      13             : 
      14             :         "github.com/cockroachdb/errors"
      15             :         "github.com/cockroachdb/pebble/internal/arenaskl"
      16             :         "github.com/cockroachdb/pebble/internal/base"
      17             :         "github.com/cockroachdb/pebble/internal/keyspan"
      18             :         "github.com/cockroachdb/pebble/internal/manual"
      19             :         "github.com/cockroachdb/pebble/internal/rangedel"
      20             :         "github.com/cockroachdb/pebble/internal/rangekey"
      21             : )
      22             : 
      23           1 : func memTableEntrySize(keyBytes, valueBytes int) uint64 {
      24           1 :         return arenaskl.MaxNodeSize(uint32(keyBytes)+8, uint32(valueBytes))
      25           1 : }
      26             : 
      27             : // memTableEmptySize is the amount of allocated space in the arena when the
      28             : // memtable is empty.
      29           1 : var memTableEmptySize = func() uint32 {
      30           1 :         var pointSkl arenaskl.Skiplist
      31           1 :         var rangeDelSkl arenaskl.Skiplist
      32           1 :         var rangeKeySkl arenaskl.Skiplist
      33           1 :         arena := arenaskl.NewArena(make([]byte, 16<<10 /* 16 KB */))
      34           1 :         pointSkl.Reset(arena, bytes.Compare)
      35           1 :         rangeDelSkl.Reset(arena, bytes.Compare)
      36           1 :         rangeKeySkl.Reset(arena, bytes.Compare)
      37           1 :         return arena.Size()
      38           1 : }()
      39             : 
      40             : // A memTable implements an in-memory layer of the LSM. A memTable is mutable,
      41             : // but append-only. Records are added, but never removed. Deletion is supported
      42             : // via tombstones, but it is up to higher level code (see Iterator) to support
      43             : // processing those tombstones.
      44             : //
      45             : // A memTable is implemented on top of a lock-free arena-backed skiplist. An
      46             : // arena is a fixed size contiguous chunk of memory (see
      47             : // Options.MemTableSize). A memTable's memory consumption is thus fixed at the
      48             : // time of creation (with the exception of the cached fragmented range
      49             : // tombstones). The arena-backed skiplist provides both forward and reverse
      50             : // links which makes forward and reverse iteration the same speed.
      51             : //
      52             : // A batch is "applied" to a memTable in a two step process: prepare(batch) ->
      53             : // apply(batch). memTable.prepare() is not thread-safe and must be called with
      54             : // external synchronization. Preparation reserves space in the memTable for the
      55             : // batch. Note that we pessimistically compute how much space a batch will
      56             : // consume in the memTable (see memTableEntrySize and
      57             : // Batch.memTableSize). Preparation is an O(1) operation. Applying a batch to
      58             : // the memTable can be performed concurrently with other apply
      59             : // operations. Applying a batch is an O(n logm) operation where N is the number
      60             : // of records in the batch and M is the number of records in the memtable. The
      61             : // commitPipeline serializes batch preparation, and allows batch application to
      62             : // proceed concurrently.
      63             : //
      64             : // It is safe to call get, apply, newIter, and newRangeDelIter concurrently.
      65             : type memTable struct {
      66             :         cmp         Compare
      67             :         formatKey   base.FormatKey
      68             :         equal       Equal
      69             :         arenaBuf    []byte
      70             :         skl         arenaskl.Skiplist
      71             :         rangeDelSkl arenaskl.Skiplist
      72             :         rangeKeySkl arenaskl.Skiplist
      73             :         // reserved tracks the amount of space used by the memtable, both by actual
      74             :         // data stored in the memtable as well as inflight batch commit
      75             :         // operations. This value is incremented pessimistically by prepare() in
      76             :         // order to account for the space needed by a batch.
      77             :         reserved uint32
      78             :         // writerRefs tracks the write references on the memtable. The two sources of
      79             :         // writer references are the memtable being on DB.mu.mem.queue and from
      80             :         // inflight mutations that have reserved space in the memtable but not yet
      81             :         // applied. The memtable cannot be flushed to disk until the writer refs
      82             :         // drops to zero.
      83             :         writerRefs atomic.Int32
      84             :         tombstones keySpanCache
      85             :         rangeKeys  keySpanCache
      86             :         // The current logSeqNum at the time the memtable was created. This is
      87             :         // guaranteed to be less than or equal to any seqnum stored in the memtable.
      88             :         logSeqNum                    uint64
      89             :         releaseAccountingReservation func()
      90             : }
      91             : 
      92           1 : func (m *memTable) free() {
      93           1 :         if m != nil {
      94           1 :                 m.releaseAccountingReservation()
      95           1 :                 manual.Free(m.arenaBuf)
      96           1 :                 m.arenaBuf = nil
      97           1 :         }
      98             : }
      99             : 
     100             : // memTableOptions holds configuration used when creating a memTable. All of
     101             : // the fields are optional and will be filled with defaults if not specified
     102             : // which is used by tests.
     103             : type memTableOptions struct {
     104             :         *Options
     105             :         arenaBuf                     []byte
     106             :         size                         int
     107             :         logSeqNum                    uint64
     108             :         releaseAccountingReservation func()
     109             : }
     110             : 
     111           1 : func checkMemTable(obj interface{}) {
     112           1 :         m := obj.(*memTable)
     113           1 :         if m.arenaBuf != nil {
     114           0 :                 fmt.Fprintf(os.Stderr, "%p: memTable buffer was not freed\n", m.arenaBuf)
     115           0 :                 os.Exit(1)
     116           0 :         }
     117             : }
     118             : 
     119             : // newMemTable returns a new MemTable of the specified size. If size is zero,
     120             : // Options.MemTableSize is used instead.
     121           1 : func newMemTable(opts memTableOptions) *memTable {
     122           1 :         opts.Options = opts.Options.EnsureDefaults()
     123           1 :         m := new(memTable)
     124           1 :         m.init(opts)
     125           1 :         return m
     126           1 : }
     127             : 
     128           1 : func (m *memTable) init(opts memTableOptions) {
     129           1 :         if opts.size == 0 {
     130           1 :                 opts.size = int(opts.MemTableSize)
     131           1 :         }
     132           1 :         *m = memTable{
     133           1 :                 cmp:                          opts.Comparer.Compare,
     134           1 :                 formatKey:                    opts.Comparer.FormatKey,
     135           1 :                 equal:                        opts.Comparer.Equal,
     136           1 :                 arenaBuf:                     opts.arenaBuf,
     137           1 :                 logSeqNum:                    opts.logSeqNum,
     138           1 :                 releaseAccountingReservation: opts.releaseAccountingReservation,
     139           1 :         }
     140           1 :         m.writerRefs.Store(1)
     141           1 :         m.tombstones = keySpanCache{
     142           1 :                 cmp:           m.cmp,
     143           1 :                 formatKey:     m.formatKey,
     144           1 :                 skl:           &m.rangeDelSkl,
     145           1 :                 constructSpan: rangeDelConstructSpan,
     146           1 :         }
     147           1 :         m.rangeKeys = keySpanCache{
     148           1 :                 cmp:           m.cmp,
     149           1 :                 formatKey:     m.formatKey,
     150           1 :                 skl:           &m.rangeKeySkl,
     151           1 :                 constructSpan: rangekey.Decode,
     152           1 :         }
     153           1 : 
     154           1 :         if m.arenaBuf == nil {
     155           1 :                 m.arenaBuf = make([]byte, opts.size)
     156           1 :         }
     157             : 
     158           1 :         arena := arenaskl.NewArena(m.arenaBuf)
     159           1 :         m.skl.Reset(arena, m.cmp)
     160           1 :         m.rangeDelSkl.Reset(arena, m.cmp)
     161           1 :         m.rangeKeySkl.Reset(arena, m.cmp)
     162           1 :         m.reserved = arena.Size()
     163             : }
     164             : 
     165           1 : func (m *memTable) writerRef() {
     166           1 :         switch v := m.writerRefs.Add(1); {
     167           0 :         case v <= 1:
     168           0 :                 panic(fmt.Sprintf("pebble: inconsistent reference count: %d", v))
     169             :         }
     170             : }
     171             : 
     172             : // writerUnref drops a ref on the memtable. Returns true if this was the last ref.
     173           1 : func (m *memTable) writerUnref() (wasLastRef bool) {
     174           1 :         switch v := m.writerRefs.Add(-1); {
     175           0 :         case v < 0:
     176           0 :                 panic(fmt.Sprintf("pebble: inconsistent reference count: %d", v))
     177           1 :         case v == 0:
     178           1 :                 return true
     179           1 :         default:
     180           1 :                 return false
     181             :         }
     182             : }
     183             : 
     184             : // readyForFlush is part of the flushable interface.
     185           1 : func (m *memTable) readyForFlush() bool {
     186           1 :         return m.writerRefs.Load() == 0
     187           1 : }
     188             : 
     189             : // Prepare reserves space for the batch in the memtable and references the
     190             : // memtable preventing it from being flushed until the batch is applied. Note
     191             : // that prepare is not thread-safe, while apply is. The caller must call
     192             : // writerUnref() after the batch has been applied.
     193           1 : func (m *memTable) prepare(batch *Batch) error {
     194           1 :         avail := m.availBytes()
     195           1 :         if batch.memTableSize > uint64(avail) {
     196           1 :                 return arenaskl.ErrArenaFull
     197           1 :         }
     198           1 :         m.reserved += uint32(batch.memTableSize)
     199           1 : 
     200           1 :         m.writerRef()
     201           1 :         return nil
     202             : }
     203             : 
     204           1 : func (m *memTable) apply(batch *Batch, seqNum uint64) error {
     205           1 :         if seqNum < m.logSeqNum {
     206           0 :                 return base.CorruptionErrorf("pebble: batch seqnum %d is less than memtable creation seqnum %d",
     207           0 :                         errors.Safe(seqNum), errors.Safe(m.logSeqNum))
     208           0 :         }
     209             : 
     210           1 :         var ins arenaskl.Inserter
     211           1 :         var tombstoneCount, rangeKeyCount uint32
     212           1 :         startSeqNum := seqNum
     213           1 :         for r := batch.Reader(); ; seqNum++ {
     214           1 :                 kind, ukey, value, ok := r.Next()
     215           1 :                 if !ok {
     216           1 :                         break
     217             :                 }
     218           1 :                 var err error
     219           1 :                 ikey := base.MakeInternalKey(ukey, seqNum, kind)
     220           1 :                 switch kind {
     221           1 :                 case InternalKeyKindRangeDelete:
     222           1 :                         err = m.rangeDelSkl.Add(ikey, value)
     223           1 :                         tombstoneCount++
     224           1 :                 case InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete:
     225           1 :                         err = m.rangeKeySkl.Add(ikey, value)
     226           1 :                         rangeKeyCount++
     227           1 :                 case InternalKeyKindLogData:
     228           1 :                         // Don't increment seqNum for LogData, since these are not applied
     229           1 :                         // to the memtable.
     230           1 :                         seqNum--
     231           0 :                 case InternalKeyKindIngestSST:
     232           0 :                         panic("pebble: cannot apply ingested sstable key kind to memtable")
     233           1 :                 default:
     234           1 :                         err = ins.Add(&m.skl, ikey, value)
     235             :                 }
     236           1 :                 if err != nil {
     237           0 :                         return err
     238           0 :                 }
     239             :         }
     240           1 :         if seqNum != startSeqNum+uint64(batch.Count()) {
     241           0 :                 return base.CorruptionErrorf("pebble: inconsistent batch count: %d vs %d",
     242           0 :                         errors.Safe(seqNum), errors.Safe(startSeqNum+uint64(batch.Count())))
     243           0 :         }
     244           1 :         if tombstoneCount != 0 {
     245           1 :                 m.tombstones.invalidate(tombstoneCount)
     246           1 :         }
     247           1 :         if rangeKeyCount != 0 {
     248           1 :                 m.rangeKeys.invalidate(rangeKeyCount)
     249           1 :         }
     250           1 :         return nil
     251             : }
     252             : 
     253             : // newIter is part of the flushable interface. It returns an iterator that is
     254             : // unpositioned (Iterator.Valid() will return false). The iterator can be
     255             : // positioned via a call to SeekGE, SeekLT, First or Last.
     256           1 : func (m *memTable) newIter(o *IterOptions) internalIterator {
     257           1 :         return m.skl.NewIter(o.GetLowerBound(), o.GetUpperBound())
     258           1 : }
     259             : 
     260             : // newFlushIter is part of the flushable interface.
     261           1 : func (m *memTable) newFlushIter(o *IterOptions, bytesFlushed *uint64) internalIterator {
     262           1 :         return m.skl.NewFlushIter(bytesFlushed)
     263           1 : }
     264             : 
     265             : // newRangeDelIter is part of the flushable interface.
     266           1 : func (m *memTable) newRangeDelIter(*IterOptions) keyspan.FragmentIterator {
     267           1 :         tombstones := m.tombstones.get()
     268           1 :         if tombstones == nil {
     269           1 :                 return nil
     270           1 :         }
     271           1 :         return keyspan.NewIter(m.cmp, tombstones)
     272             : }
     273             : 
     274             : // newRangeKeyIter is part of the flushable interface.
     275           1 : func (m *memTable) newRangeKeyIter(*IterOptions) keyspan.FragmentIterator {
     276           1 :         rangeKeys := m.rangeKeys.get()
     277           1 :         if rangeKeys == nil {
     278           1 :                 return nil
     279           1 :         }
     280           1 :         return keyspan.NewIter(m.cmp, rangeKeys)
     281             : }
     282             : 
     283             : // containsRangeKeys is part of the flushable interface.
     284           1 : func (m *memTable) containsRangeKeys() bool {
     285           1 :         return m.rangeKeys.count.Load() > 0
     286           1 : }
     287             : 
     288           1 : func (m *memTable) availBytes() uint32 {
     289           1 :         a := m.skl.Arena()
     290           1 :         if m.writerRefs.Load() == 1 {
     291           1 :                 // If there are no other concurrent apply operations, we can update the
     292           1 :                 // reserved bytes setting to accurately reflect how many bytes of been
     293           1 :                 // allocated vs the over-estimation present in memTableEntrySize.
     294           1 :                 m.reserved = a.Size()
     295           1 :         }
     296           1 :         return a.Capacity() - m.reserved
     297             : }
     298             : 
     299             : // inuseBytes is part of the flushable interface.
     300           1 : func (m *memTable) inuseBytes() uint64 {
     301           1 :         return uint64(m.skl.Size() - memTableEmptySize)
     302           1 : }
     303             : 
     304             : // totalBytes is part of the flushable interface.
     305           1 : func (m *memTable) totalBytes() uint64 {
     306           1 :         return uint64(m.skl.Arena().Capacity())
     307           1 : }
     308             : 
     309             : // empty returns whether the MemTable has no key/value pairs.
     310           1 : func (m *memTable) empty() bool {
     311           1 :         return m.skl.Size() == memTableEmptySize
     312           1 : }
     313             : 
     314             : // A keySpanFrags holds a set of fragmented keyspan.Spans with a particular key
     315             : // kind at a particular moment for a memtable.
     316             : //
     317             : // When a new span of a particular kind is added to the memtable, it may overlap
     318             : // with other spans of the same kind. Instead of performing the fragmentation
     319             : // whenever an iterator requires it, fragments are cached within a keySpanCache
     320             : // type. The keySpanCache uses keySpanFrags to hold the cached fragmented spans.
     321             : //
     322             : // The count of keys (and keys of any given kind) in a memtable only
     323             : // monotonically increases. The count of key spans of a particular kind is used
     324             : // as a stand-in for a 'sequence number'. A keySpanFrags represents the
     325             : // fragmented state of the memtable's keys of a given kind at the moment while
     326             : // there existed `count` keys of that kind in the memtable.
     327             : //
     328             : // It's currently only used to contain fragmented range deletion tombstones.
     329             : type keySpanFrags struct {
     330             :         count uint32
     331             :         once  sync.Once
     332             :         spans []keyspan.Span
     333             : }
     334             : 
     335             : type constructSpan func(ik base.InternalKey, v []byte, keysDst []keyspan.Key) (keyspan.Span, error)
     336             : 
     337             : func rangeDelConstructSpan(
     338             :         ik base.InternalKey, v []byte, keysDst []keyspan.Key,
     339           1 : ) (keyspan.Span, error) {
     340           1 :         return rangedel.Decode(ik, v, keysDst), nil
     341           1 : }
     342             : 
     343             : // get retrieves the fragmented spans, populating them if necessary. Note that
     344             : // the populated span fragments may be built from more than f.count memTable
     345             : // spans, but that is ok for correctness. All we're requiring is that the
     346             : // memTable contains at least f.count keys of the configured kind. This
     347             : // situation can occur if there are multiple concurrent additions of the key
     348             : // kind and a concurrent reader. The reader can load a keySpanFrags and populate
     349             : // it even though is has been invalidated (i.e. replaced with a newer
     350             : // keySpanFrags).
     351             : func (f *keySpanFrags) get(
     352             :         skl *arenaskl.Skiplist, cmp Compare, formatKey base.FormatKey, constructSpan constructSpan,
     353           1 : ) []keyspan.Span {
     354           1 :         f.once.Do(func() {
     355           1 :                 frag := &keyspan.Fragmenter{
     356           1 :                         Cmp:    cmp,
     357           1 :                         Format: formatKey,
     358           1 :                         Emit: func(fragmented keyspan.Span) {
     359           1 :                                 f.spans = append(f.spans, fragmented)
     360           1 :                         },
     361             :                 }
     362           1 :                 it := skl.NewIter(nil, nil)
     363           1 :                 var keysDst []keyspan.Key
     364           1 :                 for key, val := it.First(); key != nil; key, val = it.Next() {
     365           1 :                         s, err := constructSpan(*key, val.InPlaceValue(), keysDst)
     366           1 :                         if err != nil {
     367           0 :                                 panic(err)
     368             :                         }
     369           1 :                         frag.Add(s)
     370           1 :                         keysDst = s.Keys[len(s.Keys):]
     371             :                 }
     372           1 :                 frag.Finish()
     373             :         })
     374           1 :         return f.spans
     375             : }
     376             : 
     377             : // A keySpanCache is used to cache a set of fragmented spans. The cache is
     378             : // invalidated whenever a key of the same kind is added to a memTable, and
     379             : // populated when empty when a span iterator of that key kind is created.
     380             : type keySpanCache struct {
     381             :         count         atomic.Uint32
     382             :         frags         atomic.Pointer[keySpanFrags]
     383             :         cmp           Compare
     384             :         formatKey     base.FormatKey
     385             :         constructSpan constructSpan
     386             :         skl           *arenaskl.Skiplist
     387             : }
     388             : 
     389             : // Invalidate the current set of cached spans, indicating the number of
     390             : // spans that were added.
     391           1 : func (c *keySpanCache) invalidate(count uint32) {
     392           1 :         newCount := c.count.Add(count)
     393           1 :         var frags *keySpanFrags
     394           1 : 
     395           1 :         for {
     396           1 :                 oldFrags := c.frags.Load()
     397           1 :                 if oldFrags != nil && oldFrags.count >= newCount {
     398           0 :                         // Someone else invalidated the cache before us and their invalidation
     399           0 :                         // subsumes ours.
     400           0 :                         break
     401             :                 }
     402           1 :                 if frags == nil {
     403           1 :                         frags = &keySpanFrags{count: newCount}
     404           1 :                 }
     405           1 :                 if c.frags.CompareAndSwap(oldFrags, frags) {
     406           1 :                         // We successfully invalidated the cache.
     407           1 :                         break
     408             :                 }
     409             :                 // Someone else invalidated the cache. Loop and try again.
     410             :         }
     411             : }
     412             : 
     413           1 : func (c *keySpanCache) get() []keyspan.Span {
     414           1 :         frags := c.frags.Load()
     415           1 :         if frags == nil {
     416           1 :                 return nil
     417           1 :         }
     418           1 :         return frags.get(c.skl, c.cmp, c.formatKey, c.constructSpan)
     419             : }

Generated by: LCOV version 1.14