LCOV - code coverage report
Current view: top level - pebble - mem_table.go (source / functions) Hit Total Coverage
Test: 2023-10-17 08:18Z 94ccf353 - tests + meta.lcov Lines: 196 214 91.6 %
Date: 2023-10-17 08:20:03 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2011 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package pebble
       6             : 
       7             : import (
       8             :         "bytes"
       9             :         "fmt"
      10             :         "os"
      11             :         "sync"
      12             :         "sync/atomic"
      13             : 
      14             :         "github.com/cockroachdb/errors"
      15             :         "github.com/cockroachdb/pebble/internal/arenaskl"
      16             :         "github.com/cockroachdb/pebble/internal/base"
      17             :         "github.com/cockroachdb/pebble/internal/keyspan"
      18             :         "github.com/cockroachdb/pebble/internal/manual"
      19             :         "github.com/cockroachdb/pebble/internal/rangedel"
      20             :         "github.com/cockroachdb/pebble/internal/rangekey"
      21             : )
      22             : 
      23           2 : func memTableEntrySize(keyBytes, valueBytes int) uint64 {
      24           2 :         return arenaskl.MaxNodeSize(uint32(keyBytes)+8, uint32(valueBytes))
      25           2 : }
      26             : 
      27             : // memTableEmptySize is the amount of allocated space in the arena when the
      28             : // memtable is empty.
      29           2 : var memTableEmptySize = func() uint32 {
      30           2 :         var pointSkl arenaskl.Skiplist
      31           2 :         var rangeDelSkl arenaskl.Skiplist
      32           2 :         var rangeKeySkl arenaskl.Skiplist
      33           2 :         arena := arenaskl.NewArena(make([]byte, 16<<10 /* 16 KB */))
      34           2 :         pointSkl.Reset(arena, bytes.Compare)
      35           2 :         rangeDelSkl.Reset(arena, bytes.Compare)
      36           2 :         rangeKeySkl.Reset(arena, bytes.Compare)
      37           2 :         return arena.Size()
      38           2 : }()
      39             : 
      40             : // A memTable implements an in-memory layer of the LSM. A memTable is mutable,
      41             : // but append-only. Records are added, but never removed. Deletion is supported
      42             : // via tombstones, but it is up to higher level code (see Iterator) to support
      43             : // processing those tombstones.
      44             : //
      45             : // A memTable is implemented on top of a lock-free arena-backed skiplist. An
      46             : // arena is a fixed size contiguous chunk of memory (see
      47             : // Options.MemTableSize). A memTable's memory consumption is thus fixed at the
      48             : // time of creation (with the exception of the cached fragmented range
      49             : // tombstones). The arena-backed skiplist provides both forward and reverse
      50             : // links which makes forward and reverse iteration the same speed.
      51             : //
      52             : // A batch is "applied" to a memTable in a two step process: prepare(batch) ->
      53             : // apply(batch). memTable.prepare() is not thread-safe and must be called with
      54             : // external synchronization. Preparation reserves space in the memTable for the
      55             : // batch. Note that we pessimistically compute how much space a batch will
      56             : // consume in the memTable (see memTableEntrySize and
      57             : // Batch.memTableSize). Preparation is an O(1) operation. Applying a batch to
      58             : // the memTable can be performed concurrently with other apply
      59             : // operations. Applying a batch is an O(n logm) operation where N is the number
      60             : // of records in the batch and M is the number of records in the memtable. The
      61             : // commitPipeline serializes batch preparation, and allows batch application to
      62             : // proceed concurrently.
      63             : //
      64             : // It is safe to call get, apply, newIter, and newRangeDelIter concurrently.
      65             : type memTable struct {
      66             :         cmp         Compare
      67             :         formatKey   base.FormatKey
      68             :         equal       Equal
      69             :         arenaBuf    []byte
      70             :         skl         arenaskl.Skiplist
      71             :         rangeDelSkl arenaskl.Skiplist
      72             :         rangeKeySkl arenaskl.Skiplist
      73             :         // reserved tracks the amount of space used by the memtable, both by actual
      74             :         // data stored in the memtable as well as inflight batch commit
      75             :         // operations. This value is incremented pessimistically by prepare() in
      76             :         // order to account for the space needed by a batch.
      77             :         reserved uint32
      78             :         // writerRefs tracks the write references on the memtable. The two sources of
      79             :         // writer references are the memtable being on DB.mu.mem.queue and from
      80             :         // inflight mutations that have reserved space in the memtable but not yet
      81             :         // applied. The memtable cannot be flushed to disk until the writer refs
      82             :         // drops to zero.
      83             :         writerRefs atomic.Int32
      84             :         tombstones keySpanCache
      85             :         rangeKeys  keySpanCache
      86             :         // The current logSeqNum at the time the memtable was created. This is
      87             :         // guaranteed to be less than or equal to any seqnum stored in the memtable.
      88             :         logSeqNum                    uint64
      89             :         releaseAccountingReservation func()
      90             : }
      91             : 
      92           2 : func (m *memTable) free() {
      93           2 :         if m != nil {
      94           2 :                 m.releaseAccountingReservation()
      95           2 :                 manual.Free(m.arenaBuf)
      96           2 :                 m.arenaBuf = nil
      97           2 :         }
      98             : }
      99             : 
     100             : // memTableOptions holds configuration used when creating a memTable. All of
     101             : // the fields are optional and will be filled with defaults if not specified
     102             : // which is used by tests.
     103             : type memTableOptions struct {
     104             :         *Options
     105             :         arenaBuf                     []byte
     106             :         size                         int
     107             :         logSeqNum                    uint64
     108             :         releaseAccountingReservation func()
     109             : }
     110             : 
     111           2 : func checkMemTable(obj interface{}) {
     112           2 :         m := obj.(*memTable)
     113           2 :         if m.arenaBuf != nil {
     114           0 :                 fmt.Fprintf(os.Stderr, "%p: memTable buffer was not freed\n", m.arenaBuf)
     115           0 :                 os.Exit(1)
     116           0 :         }
     117             : }
     118             : 
     119             : // newMemTable returns a new MemTable of the specified size. If size is zero,
     120             : // Options.MemTableSize is used instead.
     121           1 : func newMemTable(opts memTableOptions) *memTable {
     122           1 :         opts.Options = opts.Options.EnsureDefaults()
     123           1 :         m := new(memTable)
     124           1 :         m.init(opts)
     125           1 :         return m
     126           1 : }
     127             : 
     128           2 : func (m *memTable) init(opts memTableOptions) {
     129           2 :         if opts.size == 0 {
     130           2 :                 opts.size = int(opts.MemTableSize)
     131           2 :         }
     132           2 :         *m = memTable{
     133           2 :                 cmp:                          opts.Comparer.Compare,
     134           2 :                 formatKey:                    opts.Comparer.FormatKey,
     135           2 :                 equal:                        opts.Comparer.Equal,
     136           2 :                 arenaBuf:                     opts.arenaBuf,
     137           2 :                 logSeqNum:                    opts.logSeqNum,
     138           2 :                 releaseAccountingReservation: opts.releaseAccountingReservation,
     139           2 :         }
     140           2 :         m.writerRefs.Store(1)
     141           2 :         m.tombstones = keySpanCache{
     142           2 :                 cmp:           m.cmp,
     143           2 :                 formatKey:     m.formatKey,
     144           2 :                 skl:           &m.rangeDelSkl,
     145           2 :                 constructSpan: rangeDelConstructSpan,
     146           2 :         }
     147           2 :         m.rangeKeys = keySpanCache{
     148           2 :                 cmp:           m.cmp,
     149           2 :                 formatKey:     m.formatKey,
     150           2 :                 skl:           &m.rangeKeySkl,
     151           2 :                 constructSpan: rangekey.Decode,
     152           2 :         }
     153           2 : 
     154           2 :         if m.arenaBuf == nil {
     155           1 :                 m.arenaBuf = make([]byte, opts.size)
     156           1 :         }
     157             : 
     158           2 :         arena := arenaskl.NewArena(m.arenaBuf)
     159           2 :         m.skl.Reset(arena, m.cmp)
     160           2 :         m.rangeDelSkl.Reset(arena, m.cmp)
     161           2 :         m.rangeKeySkl.Reset(arena, m.cmp)
     162           2 :         m.reserved = arena.Size()
     163             : }
     164             : 
     165           2 : func (m *memTable) writerRef() {
     166           2 :         switch v := m.writerRefs.Add(1); {
     167           0 :         case v <= 1:
     168           0 :                 panic(fmt.Sprintf("pebble: inconsistent reference count: %d", v))
     169             :         }
     170             : }
     171             : 
     172             : // writerUnref drops a ref on the memtable. Returns true if this was the last ref.
     173           2 : func (m *memTable) writerUnref() (wasLastRef bool) {
     174           2 :         switch v := m.writerRefs.Add(-1); {
     175           0 :         case v < 0:
     176           0 :                 panic(fmt.Sprintf("pebble: inconsistent reference count: %d", v))
     177           2 :         case v == 0:
     178           2 :                 return true
     179           2 :         default:
     180           2 :                 return false
     181             :         }
     182             : }
     183             : 
     184             : // readyForFlush is part of the flushable interface.
     185           2 : func (m *memTable) readyForFlush() bool {
     186           2 :         return m.writerRefs.Load() == 0
     187           2 : }
     188             : 
     189             : // Prepare reserves space for the batch in the memtable and references the
     190             : // memtable preventing it from being flushed until the batch is applied. Note
     191             : // that prepare is not thread-safe, while apply is. The caller must call
     192             : // writerUnref() after the batch has been applied.
     193           2 : func (m *memTable) prepare(batch *Batch) error {
     194           2 :         avail := m.availBytes()
     195           2 :         if batch.memTableSize > uint64(avail) {
     196           2 :                 return arenaskl.ErrArenaFull
     197           2 :         }
     198           2 :         m.reserved += uint32(batch.memTableSize)
     199           2 : 
     200           2 :         m.writerRef()
     201           2 :         return nil
     202             : }
     203             : 
     204           2 : func (m *memTable) apply(batch *Batch, seqNum uint64) error {
     205           2 :         if seqNum < m.logSeqNum {
     206           0 :                 return base.CorruptionErrorf("pebble: batch seqnum %d is less than memtable creation seqnum %d",
     207           0 :                         errors.Safe(seqNum), errors.Safe(m.logSeqNum))
     208           0 :         }
     209             : 
     210           2 :         var ins arenaskl.Inserter
     211           2 :         var tombstoneCount, rangeKeyCount uint32
     212           2 :         startSeqNum := seqNum
     213           2 :         for r := batch.Reader(); ; seqNum++ {
     214           2 :                 kind, ukey, value, ok := r.Next()
     215           2 :                 if !ok {
     216           2 :                         break
     217             :                 }
     218           2 :                 var err error
     219           2 :                 ikey := base.MakeInternalKey(ukey, seqNum, kind)
     220           2 :                 switch kind {
     221           2 :                 case InternalKeyKindRangeDelete:
     222           2 :                         err = m.rangeDelSkl.Add(ikey, value)
     223           2 :                         tombstoneCount++
     224           2 :                 case InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete:
     225           2 :                         err = m.rangeKeySkl.Add(ikey, value)
     226           2 :                         rangeKeyCount++
     227           1 :                 case InternalKeyKindLogData:
     228           1 :                         // Don't increment seqNum for LogData, since these are not applied
     229           1 :                         // to the memtable.
     230           1 :                         seqNum--
     231           0 :                 case InternalKeyKindIngestSST:
     232           0 :                         panic("pebble: cannot apply ingested sstable key kind to memtable")
     233           2 :                 default:
     234           2 :                         err = ins.Add(&m.skl, ikey, value)
     235             :                 }
     236           2 :                 if err != nil {
     237           0 :                         return err
     238           0 :                 }
     239             :         }
     240           2 :         if seqNum != startSeqNum+uint64(batch.Count()) {
     241           0 :                 return base.CorruptionErrorf("pebble: inconsistent batch count: %d vs %d",
     242           0 :                         errors.Safe(seqNum), errors.Safe(startSeqNum+uint64(batch.Count())))
     243           0 :         }
     244           2 :         if tombstoneCount != 0 {
     245           2 :                 m.tombstones.invalidate(tombstoneCount)
     246           2 :         }
     247           2 :         if rangeKeyCount != 0 {
     248           2 :                 m.rangeKeys.invalidate(rangeKeyCount)
     249           2 :         }
     250           2 :         return nil
     251             : }
     252             : 
     253             : // newIter is part of the flushable interface. It returns an iterator that is
     254             : // unpositioned (Iterator.Valid() will return false). The iterator can be
     255             : // positioned via a call to SeekGE, SeekLT, First or Last.
     256           2 : func (m *memTable) newIter(o *IterOptions) internalIterator {
     257           2 :         return m.skl.NewIter(o.GetLowerBound(), o.GetUpperBound())
     258           2 : }
     259             : 
     260             : // newFlushIter is part of the flushable interface.
     261           2 : func (m *memTable) newFlushIter(o *IterOptions, bytesFlushed *uint64) internalIterator {
     262           2 :         return m.skl.NewFlushIter(bytesFlushed)
     263           2 : }
     264             : 
     265             : // newRangeDelIter is part of the flushable interface.
     266           2 : func (m *memTable) newRangeDelIter(*IterOptions) keyspan.FragmentIterator {
     267           2 :         tombstones := m.tombstones.get()
     268           2 :         if tombstones == nil {
     269           2 :                 return nil
     270           2 :         }
     271           2 :         return keyspan.NewIter(m.cmp, tombstones)
     272             : }
     273             : 
     274             : // newRangeKeyIter is part of the flushable interface.
     275           2 : func (m *memTable) newRangeKeyIter(*IterOptions) keyspan.FragmentIterator {
     276           2 :         rangeKeys := m.rangeKeys.get()
     277           2 :         if rangeKeys == nil {
     278           2 :                 return nil
     279           2 :         }
     280           2 :         return keyspan.NewIter(m.cmp, rangeKeys)
     281             : }
     282             : 
     283             : // containsRangeKeys is part of the flushable interface.
     284           2 : func (m *memTable) containsRangeKeys() bool {
     285           2 :         return m.rangeKeys.count.Load() > 0
     286           2 : }
     287             : 
     288           2 : func (m *memTable) availBytes() uint32 {
     289           2 :         a := m.skl.Arena()
     290           2 :         if m.writerRefs.Load() == 1 {
     291           2 :                 // If there are no other concurrent apply operations, we can update the
     292           2 :                 // reserved bytes setting to accurately reflect how many bytes of been
     293           2 :                 // allocated vs the over-estimation present in memTableEntrySize.
     294           2 :                 m.reserved = a.Size()
     295           2 :         }
     296           2 :         return a.Capacity() - m.reserved
     297             : }
     298             : 
     299             : // inuseBytes is part of the flushable interface.
     300           2 : func (m *memTable) inuseBytes() uint64 {
     301           2 :         return uint64(m.skl.Size() - memTableEmptySize)
     302           2 : }
     303             : 
     304             : // totalBytes is part of the flushable interface.
     305           2 : func (m *memTable) totalBytes() uint64 {
     306           2 :         return uint64(m.skl.Arena().Capacity())
     307           2 : }
     308             : 
     309             : // empty returns whether the MemTable has no key/value pairs.
     310           1 : func (m *memTable) empty() bool {
     311           1 :         return m.skl.Size() == memTableEmptySize
     312           1 : }
     313             : 
     314             : // A keySpanFrags holds a set of fragmented keyspan.Spans with a particular key
     315             : // kind at a particular moment for a memtable.
     316             : //
     317             : // When a new span of a particular kind is added to the memtable, it may overlap
     318             : // with other spans of the same kind. Instead of performing the fragmentation
     319             : // whenever an iterator requires it, fragments are cached within a keySpanCache
     320             : // type. The keySpanCache uses keySpanFrags to hold the cached fragmented spans.
     321             : //
     322             : // The count of keys (and keys of any given kind) in a memtable only
     323             : // monotonically increases. The count of key spans of a particular kind is used
     324             : // as a stand-in for a 'sequence number'. A keySpanFrags represents the
     325             : // fragmented state of the memtable's keys of a given kind at the moment while
     326             : // there existed `count` keys of that kind in the memtable.
     327             : //
     328             : // It's currently only used to contain fragmented range deletion tombstones.
     329             : type keySpanFrags struct {
     330             :         count uint32
     331             :         once  sync.Once
     332             :         spans []keyspan.Span
     333             : }
     334             : 
     335             : type constructSpan func(ik base.InternalKey, v []byte, keysDst []keyspan.Key) (keyspan.Span, error)
     336             : 
     337             : func rangeDelConstructSpan(
     338             :         ik base.InternalKey, v []byte, keysDst []keyspan.Key,
     339           2 : ) (keyspan.Span, error) {
     340           2 :         return rangedel.Decode(ik, v, keysDst), nil
     341           2 : }
     342             : 
     343             : // get retrieves the fragmented spans, populating them if necessary. Note that
     344             : // the populated span fragments may be built from more than f.count memTable
     345             : // spans, but that is ok for correctness. All we're requiring is that the
     346             : // memTable contains at least f.count keys of the configured kind. This
     347             : // situation can occur if there are multiple concurrent additions of the key
     348             : // kind and a concurrent reader. The reader can load a keySpanFrags and populate
     349             : // it even though is has been invalidated (i.e. replaced with a newer
     350             : // keySpanFrags).
     351             : func (f *keySpanFrags) get(
     352             :         skl *arenaskl.Skiplist, cmp Compare, formatKey base.FormatKey, constructSpan constructSpan,
     353           2 : ) []keyspan.Span {
     354           2 :         f.once.Do(func() {
     355           2 :                 frag := &keyspan.Fragmenter{
     356           2 :                         Cmp:    cmp,
     357           2 :                         Format: formatKey,
     358           2 :                         Emit: func(fragmented keyspan.Span) {
     359           2 :                                 f.spans = append(f.spans, fragmented)
     360           2 :                         },
     361             :                 }
     362           2 :                 it := skl.NewIter(nil, nil)
     363           2 :                 var keysDst []keyspan.Key
     364           2 :                 for key, val := it.First(); key != nil; key, val = it.Next() {
     365           2 :                         s, err := constructSpan(*key, val.InPlaceValue(), keysDst)
     366           2 :                         if err != nil {
     367           0 :                                 panic(err)
     368             :                         }
     369           2 :                         frag.Add(s)
     370           2 :                         keysDst = s.Keys[len(s.Keys):]
     371             :                 }
     372           2 :                 frag.Finish()
     373             :         })
     374           2 :         return f.spans
     375             : }
     376             : 
     377             : // A keySpanCache is used to cache a set of fragmented spans. The cache is
     378             : // invalidated whenever a key of the same kind is added to a memTable, and
     379             : // populated when empty when a span iterator of that key kind is created.
     380             : type keySpanCache struct {
     381             :         count         atomic.Uint32
     382             :         frags         atomic.Pointer[keySpanFrags]
     383             :         cmp           Compare
     384             :         formatKey     base.FormatKey
     385             :         constructSpan constructSpan
     386             :         skl           *arenaskl.Skiplist
     387             : }
     388             : 
     389             : // Invalidate the current set of cached spans, indicating the number of
     390             : // spans that were added.
     391           2 : func (c *keySpanCache) invalidate(count uint32) {
     392           2 :         newCount := c.count.Add(count)
     393           2 :         var frags *keySpanFrags
     394           2 : 
     395           2 :         for {
     396           2 :                 oldFrags := c.frags.Load()
     397           2 :                 if oldFrags != nil && oldFrags.count >= newCount {
     398           1 :                         // Someone else invalidated the cache before us and their invalidation
     399           1 :                         // subsumes ours.
     400           1 :                         break
     401             :                 }
     402           2 :                 if frags == nil {
     403           2 :                         frags = &keySpanFrags{count: newCount}
     404           2 :                 }
     405           2 :                 if c.frags.CompareAndSwap(oldFrags, frags) {
     406           2 :                         // We successfully invalidated the cache.
     407           2 :                         break
     408             :                 }
     409             :                 // Someone else invalidated the cache. Loop and try again.
     410             :         }
     411             : }
     412             : 
     413           2 : func (c *keySpanCache) get() []keyspan.Span {
     414           2 :         frags := c.frags.Load()
     415           2 :         if frags == nil {
     416           2 :                 return nil
     417           2 :         }
     418           2 :         return frags.get(c.skl, c.cmp, c.formatKey, c.constructSpan)
     419             : }

Generated by: LCOV version 1.14