Line data Source code
1 : // Copyright 2011 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "bytes"
9 : "fmt"
10 : "os"
11 : "sync"
12 : "sync/atomic"
13 :
14 : "github.com/cockroachdb/errors"
15 : "github.com/cockroachdb/pebble/internal/arenaskl"
16 : "github.com/cockroachdb/pebble/internal/base"
17 : "github.com/cockroachdb/pebble/internal/keyspan"
18 : "github.com/cockroachdb/pebble/internal/manual"
19 : "github.com/cockroachdb/pebble/internal/rangedel"
20 : "github.com/cockroachdb/pebble/internal/rangekey"
21 : )
22 :
23 1 : func memTableEntrySize(keyBytes, valueBytes int) uint64 {
24 1 : return arenaskl.MaxNodeSize(uint32(keyBytes)+8, uint32(valueBytes))
25 1 : }
26 :
27 : // memTableEmptySize is the amount of allocated space in the arena when the
28 : // memtable is empty.
29 1 : var memTableEmptySize = func() uint32 {
30 1 : var pointSkl arenaskl.Skiplist
31 1 : var rangeDelSkl arenaskl.Skiplist
32 1 : var rangeKeySkl arenaskl.Skiplist
33 1 : arena := arenaskl.NewArena(make([]byte, 16<<10 /* 16 KB */))
34 1 : pointSkl.Reset(arena, bytes.Compare)
35 1 : rangeDelSkl.Reset(arena, bytes.Compare)
36 1 : rangeKeySkl.Reset(arena, bytes.Compare)
37 1 : return arena.Size()
38 1 : }()
39 :
40 : // A memTable implements an in-memory layer of the LSM. A memTable is mutable,
41 : // but append-only. Records are added, but never removed. Deletion is supported
42 : // via tombstones, but it is up to higher level code (see Iterator) to support
43 : // processing those tombstones.
44 : //
45 : // A memTable is implemented on top of a lock-free arena-backed skiplist. An
46 : // arena is a fixed size contiguous chunk of memory (see
47 : // Options.MemTableSize). A memTable's memory consumption is thus fixed at the
48 : // time of creation (with the exception of the cached fragmented range
49 : // tombstones). The arena-backed skiplist provides both forward and reverse
50 : // links which makes forward and reverse iteration the same speed.
51 : //
52 : // A batch is "applied" to a memTable in a two step process: prepare(batch) ->
53 : // apply(batch). memTable.prepare() is not thread-safe and must be called with
54 : // external synchronization. Preparation reserves space in the memTable for the
55 : // batch. Note that we pessimistically compute how much space a batch will
56 : // consume in the memTable (see memTableEntrySize and
57 : // Batch.memTableSize). Preparation is an O(1) operation. Applying a batch to
58 : // the memTable can be performed concurrently with other apply
59 : // operations. Applying a batch is an O(n logm) operation where N is the number
60 : // of records in the batch and M is the number of records in the memtable. The
61 : // commitPipeline serializes batch preparation, and allows batch application to
62 : // proceed concurrently.
63 : //
64 : // It is safe to call get, apply, newIter, and newRangeDelIter concurrently.
65 : type memTable struct {
66 : cmp Compare
67 : formatKey base.FormatKey
68 : equal Equal
69 : arenaBuf manual.Buf
70 : skl arenaskl.Skiplist
71 : rangeDelSkl arenaskl.Skiplist
72 : rangeKeySkl arenaskl.Skiplist
73 : // reserved tracks the amount of space used by the memtable, both by actual
74 : // data stored in the memtable as well as inflight batch commit
75 : // operations. This value is incremented pessimistically by prepare() in
76 : // order to account for the space needed by a batch.
77 : reserved uint32
78 : // writerRefs tracks the write references on the memtable. The two sources of
79 : // writer references are the memtable being on DB.mu.mem.queue and from
80 : // inflight mutations that have reserved space in the memtable but not yet
81 : // applied. The memtable cannot be flushed to disk until the writer refs
82 : // drops to zero.
83 : writerRefs atomic.Int32
84 : tombstones keySpanCache
85 : rangeKeys keySpanCache
86 : // The current logSeqNum at the time the memtable was created. This is
87 : // guaranteed to be less than or equal to any seqnum stored in the memtable.
88 : logSeqNum base.SeqNum
89 : releaseAccountingReservation func()
90 : }
91 :
92 1 : func (m *memTable) free() {
93 1 : if m != nil {
94 1 : m.releaseAccountingReservation()
95 1 : manual.Free(manual.MemTable, m.arenaBuf)
96 1 : m.arenaBuf = manual.Buf{}
97 1 : }
98 : }
99 :
100 : // memTableOptions holds configuration used when creating a memTable. All of
101 : // the fields are optional and will be filled with defaults if not specified
102 : // which is used by tests.
103 : type memTableOptions struct {
104 : *Options
105 : arenaBuf manual.Buf
106 : size int
107 : logSeqNum base.SeqNum
108 : releaseAccountingReservation func()
109 : }
110 :
111 1 : func checkMemTable(obj interface{}) {
112 1 : m := obj.(*memTable)
113 1 : if m.arenaBuf.Data() != nil {
114 0 : fmt.Fprintf(os.Stderr, "%v: memTable buffer was not freed\n", m.arenaBuf)
115 0 : os.Exit(1)
116 0 : }
117 : }
118 :
119 : // newMemTable returns a new MemTable of the specified size. If size is zero,
120 : // Options.MemTableSize is used instead.
121 0 : func newMemTable(opts memTableOptions) *memTable {
122 0 : if opts.Options == nil {
123 0 : opts.Options = &Options{}
124 0 : }
125 0 : opts.Options.EnsureDefaults()
126 0 : m := new(memTable)
127 0 : m.init(opts)
128 0 : return m
129 : }
130 :
131 1 : func (m *memTable) init(opts memTableOptions) {
132 1 : if opts.size == 0 {
133 1 : opts.size = int(opts.MemTableSize)
134 1 : }
135 1 : *m = memTable{
136 1 : cmp: opts.Comparer.Compare,
137 1 : formatKey: opts.Comparer.FormatKey,
138 1 : equal: opts.Comparer.Equal,
139 1 : arenaBuf: opts.arenaBuf,
140 1 : logSeqNum: opts.logSeqNum,
141 1 : releaseAccountingReservation: opts.releaseAccountingReservation,
142 1 : }
143 1 : m.writerRefs.Store(1)
144 1 : m.tombstones = keySpanCache{
145 1 : cmp: m.cmp,
146 1 : formatKey: m.formatKey,
147 1 : skl: &m.rangeDelSkl,
148 1 : constructSpan: rangeDelConstructSpan,
149 1 : }
150 1 : m.rangeKeys = keySpanCache{
151 1 : cmp: m.cmp,
152 1 : formatKey: m.formatKey,
153 1 : skl: &m.rangeKeySkl,
154 1 : constructSpan: rangekey.Decode,
155 1 : }
156 1 :
157 1 : if m.arenaBuf.Data() == nil {
158 0 : m.arenaBuf = manual.New(manual.MemTable, uintptr(opts.size))
159 0 : }
160 :
161 1 : arena := arenaskl.NewArena(m.arenaBuf.Slice())
162 1 : m.skl.Reset(arena, m.cmp)
163 1 : m.rangeDelSkl.Reset(arena, m.cmp)
164 1 : m.rangeKeySkl.Reset(arena, m.cmp)
165 1 : m.reserved = arena.Size()
166 : }
167 :
168 1 : func (m *memTable) writerRef() {
169 1 : switch v := m.writerRefs.Add(1); {
170 0 : case v <= 1:
171 0 : panic(fmt.Sprintf("pebble: inconsistent reference count: %d", v))
172 : }
173 : }
174 :
175 : // writerUnref drops a ref on the memtable. Returns true if this was the last ref.
176 1 : func (m *memTable) writerUnref() (wasLastRef bool) {
177 1 : switch v := m.writerRefs.Add(-1); {
178 0 : case v < 0:
179 0 : panic(fmt.Sprintf("pebble: inconsistent reference count: %d", v))
180 1 : case v == 0:
181 1 : return true
182 1 : default:
183 1 : return false
184 : }
185 : }
186 :
187 : // readyForFlush is part of the flushable interface.
188 1 : func (m *memTable) readyForFlush() bool {
189 1 : return m.writerRefs.Load() == 0
190 1 : }
191 :
192 : // Prepare reserves space for the batch in the memtable and references the
193 : // memtable preventing it from being flushed until the batch is applied. Note
194 : // that prepare is not thread-safe, while apply is. The caller must call
195 : // writerUnref() after the batch has been applied.
196 1 : func (m *memTable) prepare(batch *Batch) error {
197 1 : avail := m.availBytes()
198 1 : if batch.memTableSize > uint64(avail) {
199 1 : return arenaskl.ErrArenaFull
200 1 : }
201 1 : m.reserved += uint32(batch.memTableSize)
202 1 :
203 1 : m.writerRef()
204 1 : return nil
205 : }
206 :
207 1 : func (m *memTable) apply(batch *Batch, seqNum base.SeqNum) error {
208 1 : if seqNum < m.logSeqNum {
209 0 : return base.CorruptionErrorf("pebble: batch seqnum %d is less than memtable creation seqnum %d",
210 0 : errors.Safe(seqNum), errors.Safe(m.logSeqNum))
211 0 : }
212 :
213 1 : var ins arenaskl.Inserter
214 1 : var tombstoneCount, rangeKeyCount uint32
215 1 : startSeqNum := seqNum
216 1 : for r := batch.Reader(); ; seqNum++ {
217 1 : kind, ukey, value, ok, err := r.Next()
218 1 : if !ok {
219 1 : if err != nil {
220 0 : return err
221 0 : }
222 1 : break
223 : }
224 1 : ikey := base.MakeInternalKey(ukey, seqNum, kind)
225 1 : switch kind {
226 1 : case InternalKeyKindRangeDelete:
227 1 : err = m.rangeDelSkl.Add(ikey, value)
228 1 : tombstoneCount++
229 1 : case InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete:
230 1 : err = m.rangeKeySkl.Add(ikey, value)
231 1 : rangeKeyCount++
232 1 : case InternalKeyKindLogData:
233 1 : // Don't increment seqNum for LogData, since these are not applied
234 1 : // to the memtable.
235 1 : seqNum--
236 0 : case InternalKeyKindIngestSST, InternalKeyKindExcise:
237 0 : panic("pebble: cannot apply ingested sstable or excise kind keys to memtable")
238 1 : default:
239 1 : err = ins.Add(&m.skl, ikey, value)
240 : }
241 1 : if err != nil {
242 0 : return err
243 0 : }
244 : }
245 1 : if seqNum != startSeqNum+base.SeqNum(batch.Count()) {
246 0 : return base.CorruptionErrorf("pebble: inconsistent batch count: %d vs %d",
247 0 : errors.Safe(seqNum), errors.Safe(startSeqNum+base.SeqNum(batch.Count())))
248 0 : }
249 1 : if tombstoneCount != 0 {
250 1 : m.tombstones.invalidate(tombstoneCount)
251 1 : }
252 1 : if rangeKeyCount != 0 {
253 1 : m.rangeKeys.invalidate(rangeKeyCount)
254 1 : }
255 1 : return nil
256 : }
257 :
258 : // newIter is part of the flushable interface. It returns an iterator that is
259 : // unpositioned (Iterator.Valid() will return false). The iterator can be
260 : // positioned via a call to SeekGE, SeekLT, First or Last.
261 1 : func (m *memTable) newIter(o *IterOptions) internalIterator {
262 1 : return m.skl.NewIter(o.GetLowerBound(), o.GetUpperBound())
263 1 : }
264 :
265 : // newFlushIter is part of the flushable interface.
266 1 : func (m *memTable) newFlushIter(o *IterOptions) internalIterator {
267 1 : return m.skl.NewFlushIter()
268 1 : }
269 :
270 : // newRangeDelIter is part of the flushable interface.
271 1 : func (m *memTable) newRangeDelIter(*IterOptions) keyspan.FragmentIterator {
272 1 : tombstones := m.tombstones.get()
273 1 : if tombstones == nil {
274 1 : return nil
275 1 : }
276 1 : return keyspan.NewIter(m.cmp, tombstones)
277 : }
278 :
279 : // newRangeKeyIter is part of the flushable interface.
280 1 : func (m *memTable) newRangeKeyIter(*IterOptions) keyspan.FragmentIterator {
281 1 : rangeKeys := m.rangeKeys.get()
282 1 : if rangeKeys == nil {
283 1 : return nil
284 1 : }
285 1 : return keyspan.NewIter(m.cmp, rangeKeys)
286 : }
287 :
288 : // containsRangeKeys is part of the flushable interface.
289 1 : func (m *memTable) containsRangeKeys() bool {
290 1 : return m.rangeKeys.count.Load() > 0
291 1 : }
292 :
293 1 : func (m *memTable) availBytes() uint32 {
294 1 : a := m.skl.Arena()
295 1 : if m.writerRefs.Load() == 1 {
296 1 : // Note that one ref is maintained as long as the memtable is the
297 1 : // current mutable memtable, so when evaluating whether the current
298 1 : // mutable memtable has sufficient space for committing a batch, it is
299 1 : // guaranteed that m.writerRefs() >= 1. This means a writerRefs() of 1
300 1 : // indicates there are no other concurrent apply operations.
301 1 : //
302 1 : // If there are no other concurrent apply operations, we can update the
303 1 : // reserved bytes setting to accurately reflect how many bytes of been
304 1 : // allocated vs the over-estimation present in memTableEntrySize.
305 1 : m.reserved = a.Size()
306 1 : }
307 1 : return a.Capacity() - m.reserved
308 : }
309 :
310 : // inuseBytes is part of the flushable interface.
311 1 : func (m *memTable) inuseBytes() uint64 {
312 1 : return uint64(m.skl.Size() - memTableEmptySize)
313 1 : }
314 :
315 : // totalBytes is part of the flushable interface.
316 1 : func (m *memTable) totalBytes() uint64 {
317 1 : return uint64(m.skl.Arena().Capacity())
318 1 : }
319 :
320 : // empty returns whether the MemTable has no key/value pairs.
321 0 : func (m *memTable) empty() bool {
322 0 : return m.skl.Size() == memTableEmptySize
323 0 : }
324 :
325 : // computePossibleOverlaps is part of the flushable interface.
326 1 : func (m *memTable) computePossibleOverlaps(fn func(bounded) shouldContinue, bounded ...bounded) {
327 1 : computePossibleOverlapsGenericImpl[*memTable](m, m.cmp, fn, bounded)
328 1 : }
329 :
330 : // A keySpanFrags holds a set of fragmented keyspan.Spans with a particular key
331 : // kind at a particular moment for a memtable.
332 : //
333 : // When a new span of a particular kind is added to the memtable, it may overlap
334 : // with other spans of the same kind. Instead of performing the fragmentation
335 : // whenever an iterator requires it, fragments are cached within a keySpanCache
336 : // type. The keySpanCache uses keySpanFrags to hold the cached fragmented spans.
337 : //
338 : // The count of keys (and keys of any given kind) in a memtable only
339 : // monotonically increases. The count of key spans of a particular kind is used
340 : // as a stand-in for a 'sequence number'. A keySpanFrags represents the
341 : // fragmented state of the memtable's keys of a given kind at the moment while
342 : // there existed `count` keys of that kind in the memtable.
343 : //
344 : // It's currently only used to contain fragmented range deletion tombstones.
345 : type keySpanFrags struct {
346 : count uint32
347 : once sync.Once
348 : spans []keyspan.Span
349 : }
350 :
351 : type constructSpan func(ik base.InternalKey, v []byte, keysDst []keyspan.Key) (keyspan.Span, error)
352 :
353 : func rangeDelConstructSpan(
354 : ik base.InternalKey, v []byte, keysDst []keyspan.Key,
355 1 : ) (keyspan.Span, error) {
356 1 : return rangedel.Decode(ik, v, keysDst), nil
357 1 : }
358 :
359 : // get retrieves the fragmented spans, populating them if necessary. Note that
360 : // the populated span fragments may be built from more than f.count memTable
361 : // spans, but that is ok for correctness. All we're requiring is that the
362 : // memTable contains at least f.count keys of the configured kind. This
363 : // situation can occur if there are multiple concurrent additions of the key
364 : // kind and a concurrent reader. The reader can load a keySpanFrags and populate
365 : // it even though is has been invalidated (i.e. replaced with a newer
366 : // keySpanFrags).
367 : func (f *keySpanFrags) get(
368 : skl *arenaskl.Skiplist, cmp Compare, formatKey base.FormatKey, constructSpan constructSpan,
369 1 : ) []keyspan.Span {
370 1 : f.once.Do(func() {
371 1 : frag := &keyspan.Fragmenter{
372 1 : Cmp: cmp,
373 1 : Format: formatKey,
374 1 : Emit: func(fragmented keyspan.Span) {
375 1 : f.spans = append(f.spans, fragmented)
376 1 : },
377 : }
378 1 : it := skl.NewIter(nil, nil)
379 1 : var keysDst []keyspan.Key
380 1 : for kv := it.First(); kv != nil; kv = it.Next() {
381 1 : s, err := constructSpan(kv.K, kv.InPlaceValue(), keysDst)
382 1 : if err != nil {
383 0 : panic(err)
384 : }
385 1 : frag.Add(s)
386 1 : keysDst = s.Keys[len(s.Keys):]
387 : }
388 1 : frag.Finish()
389 : })
390 1 : return f.spans
391 : }
392 :
393 : // A keySpanCache is used to cache a set of fragmented spans. The cache is
394 : // invalidated whenever a key of the same kind is added to a memTable, and
395 : // populated when empty when a span iterator of that key kind is created.
396 : type keySpanCache struct {
397 : count atomic.Uint32
398 : frags atomic.Pointer[keySpanFrags]
399 : cmp Compare
400 : formatKey base.FormatKey
401 : constructSpan constructSpan
402 : skl *arenaskl.Skiplist
403 : }
404 :
405 : // Invalidate the current set of cached spans, indicating the number of
406 : // spans that were added.
407 1 : func (c *keySpanCache) invalidate(count uint32) {
408 1 : newCount := c.count.Add(count)
409 1 : var frags *keySpanFrags
410 1 :
411 1 : for {
412 1 : oldFrags := c.frags.Load()
413 1 : if oldFrags != nil && oldFrags.count >= newCount {
414 0 : // Someone else invalidated the cache before us and their invalidation
415 0 : // subsumes ours.
416 0 : break
417 : }
418 1 : if frags == nil {
419 1 : frags = &keySpanFrags{count: newCount}
420 1 : }
421 1 : if c.frags.CompareAndSwap(oldFrags, frags) {
422 1 : // We successfully invalidated the cache.
423 1 : break
424 : }
425 : // Someone else invalidated the cache. Loop and try again.
426 : }
427 : }
428 :
429 1 : func (c *keySpanCache) get() []keyspan.Span {
430 1 : frags := c.frags.Load()
431 1 : if frags == nil {
432 1 : return nil
433 1 : }
434 1 : return frags.get(c.skl, c.cmp, c.formatKey, c.constructSpan)
435 : }
|