Line data Source code
1 : // Copyright 2011 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "bytes"
9 : "fmt"
10 : "os"
11 : "sync"
12 : "sync/atomic"
13 :
14 : "github.com/cockroachdb/errors"
15 : "github.com/cockroachdb/pebble/internal/arenaskl"
16 : "github.com/cockroachdb/pebble/internal/base"
17 : "github.com/cockroachdb/pebble/internal/keyspan"
18 : "github.com/cockroachdb/pebble/internal/manual"
19 : "github.com/cockroachdb/pebble/internal/rangedel"
20 : "github.com/cockroachdb/pebble/internal/rangekey"
21 : )
22 :
23 2 : func memTableEntrySize(keyBytes, valueBytes int) uint64 {
24 2 : return arenaskl.MaxNodeSize(uint32(keyBytes)+8, uint32(valueBytes))
25 2 : }
26 :
27 : // memTableEmptySize is the amount of allocated space in the arena when the
28 : // memtable is empty.
29 2 : var memTableEmptySize = func() uint32 {
30 2 : var pointSkl arenaskl.Skiplist
31 2 : var rangeDelSkl arenaskl.Skiplist
32 2 : var rangeKeySkl arenaskl.Skiplist
33 2 : arena := arenaskl.NewArena(make([]byte, 16<<10 /* 16 KB */))
34 2 : pointSkl.Reset(arena, bytes.Compare)
35 2 : rangeDelSkl.Reset(arena, bytes.Compare)
36 2 : rangeKeySkl.Reset(arena, bytes.Compare)
37 2 : return arena.Size()
38 2 : }()
39 :
40 : // A memTable implements an in-memory layer of the LSM. A memTable is mutable,
41 : // but append-only. Records are added, but never removed. Deletion is supported
42 : // via tombstones, but it is up to higher level code (see Iterator) to support
43 : // processing those tombstones.
44 : //
45 : // A memTable is implemented on top of a lock-free arena-backed skiplist. An
46 : // arena is a fixed size contiguous chunk of memory (see
47 : // Options.MemTableSize). A memTable's memory consumption is thus fixed at the
48 : // time of creation (with the exception of the cached fragmented range
49 : // tombstones). The arena-backed skiplist provides both forward and reverse
50 : // links which makes forward and reverse iteration the same speed.
51 : //
52 : // A batch is "applied" to a memTable in a two step process: prepare(batch) ->
53 : // apply(batch). memTable.prepare() is not thread-safe and must be called with
54 : // external synchronization. Preparation reserves space in the memTable for the
55 : // batch. Note that we pessimistically compute how much space a batch will
56 : // consume in the memTable (see memTableEntrySize and
57 : // Batch.memTableSize). Preparation is an O(1) operation. Applying a batch to
58 : // the memTable can be performed concurrently with other apply
59 : // operations. Applying a batch is an O(n logm) operation where N is the number
60 : // of records in the batch and M is the number of records in the memtable. The
61 : // commitPipeline serializes batch preparation, and allows batch application to
62 : // proceed concurrently.
63 : //
64 : // It is safe to call get, apply, newIter, and newRangeDelIter concurrently.
65 : type memTable struct {
66 : cmp Compare
67 : formatKey base.FormatKey
68 : equal Equal
69 : arenaBuf []byte
70 : skl arenaskl.Skiplist
71 : rangeDelSkl arenaskl.Skiplist
72 : rangeKeySkl arenaskl.Skiplist
73 : // reserved tracks the amount of space used by the memtable, both by actual
74 : // data stored in the memtable as well as inflight batch commit
75 : // operations. This value is incremented pessimistically by prepare() in
76 : // order to account for the space needed by a batch.
77 : reserved uint32
78 : // writerRefs tracks the write references on the memtable. The two sources of
79 : // writer references are the memtable being on DB.mu.mem.queue and from
80 : // inflight mutations that have reserved space in the memtable but not yet
81 : // applied. The memtable cannot be flushed to disk until the writer refs
82 : // drops to zero.
83 : writerRefs atomic.Int32
84 : tombstones keySpanCache
85 : rangeKeys keySpanCache
86 : // The current logSeqNum at the time the memtable was created. This is
87 : // guaranteed to be less than or equal to any seqnum stored in the memtable.
88 : logSeqNum uint64
89 : releaseAccountingReservation func()
90 : }
91 :
92 2 : func (m *memTable) free() {
93 2 : if m != nil {
94 2 : m.releaseAccountingReservation()
95 2 : manual.Free(m.arenaBuf)
96 2 : m.arenaBuf = nil
97 2 : }
98 : }
99 :
100 : // memTableOptions holds configuration used when creating a memTable. All of
101 : // the fields are optional and will be filled with defaults if not specified
102 : // which is used by tests.
103 : type memTableOptions struct {
104 : *Options
105 : arenaBuf []byte
106 : size int
107 : logSeqNum uint64
108 : releaseAccountingReservation func()
109 : }
110 :
111 2 : func checkMemTable(obj interface{}) {
112 2 : m := obj.(*memTable)
113 2 : if m.arenaBuf != nil {
114 0 : fmt.Fprintf(os.Stderr, "%p: memTable buffer was not freed\n", m.arenaBuf)
115 0 : os.Exit(1)
116 0 : }
117 : }
118 :
119 : // newMemTable returns a new MemTable of the specified size. If size is zero,
120 : // Options.MemTableSize is used instead.
121 1 : func newMemTable(opts memTableOptions) *memTable {
122 1 : opts.Options = opts.Options.EnsureDefaults()
123 1 : m := new(memTable)
124 1 : m.init(opts)
125 1 : return m
126 1 : }
127 :
128 2 : func (m *memTable) init(opts memTableOptions) {
129 2 : if opts.size == 0 {
130 2 : opts.size = int(opts.MemTableSize)
131 2 : }
132 2 : *m = memTable{
133 2 : cmp: opts.Comparer.Compare,
134 2 : formatKey: opts.Comparer.FormatKey,
135 2 : equal: opts.Comparer.Equal,
136 2 : arenaBuf: opts.arenaBuf,
137 2 : logSeqNum: opts.logSeqNum,
138 2 : releaseAccountingReservation: opts.releaseAccountingReservation,
139 2 : }
140 2 : m.writerRefs.Store(1)
141 2 : m.tombstones = keySpanCache{
142 2 : cmp: m.cmp,
143 2 : formatKey: m.formatKey,
144 2 : skl: &m.rangeDelSkl,
145 2 : constructSpan: rangeDelConstructSpan,
146 2 : }
147 2 : m.rangeKeys = keySpanCache{
148 2 : cmp: m.cmp,
149 2 : formatKey: m.formatKey,
150 2 : skl: &m.rangeKeySkl,
151 2 : constructSpan: rangekey.Decode,
152 2 : }
153 2 :
154 2 : if m.arenaBuf == nil {
155 1 : m.arenaBuf = make([]byte, opts.size)
156 1 : }
157 :
158 2 : arena := arenaskl.NewArena(m.arenaBuf)
159 2 : m.skl.Reset(arena, m.cmp)
160 2 : m.rangeDelSkl.Reset(arena, m.cmp)
161 2 : m.rangeKeySkl.Reset(arena, m.cmp)
162 2 : m.reserved = arena.Size()
163 : }
164 :
165 2 : func (m *memTable) writerRef() {
166 2 : switch v := m.writerRefs.Add(1); {
167 0 : case v <= 1:
168 0 : panic(fmt.Sprintf("pebble: inconsistent reference count: %d", v))
169 : }
170 : }
171 :
172 : // writerUnref drops a ref on the memtable. Returns true if this was the last ref.
173 2 : func (m *memTable) writerUnref() (wasLastRef bool) {
174 2 : switch v := m.writerRefs.Add(-1); {
175 0 : case v < 0:
176 0 : panic(fmt.Sprintf("pebble: inconsistent reference count: %d", v))
177 2 : case v == 0:
178 2 : return true
179 2 : default:
180 2 : return false
181 : }
182 : }
183 :
184 : // readyForFlush is part of the flushable interface.
185 2 : func (m *memTable) readyForFlush() bool {
186 2 : return m.writerRefs.Load() == 0
187 2 : }
188 :
189 : // Prepare reserves space for the batch in the memtable and references the
190 : // memtable preventing it from being flushed until the batch is applied. Note
191 : // that prepare is not thread-safe, while apply is. The caller must call
192 : // writerUnref() after the batch has been applied.
193 2 : func (m *memTable) prepare(batch *Batch) error {
194 2 : avail := m.availBytes()
195 2 : if batch.memTableSize > uint64(avail) {
196 2 : return arenaskl.ErrArenaFull
197 2 : }
198 2 : m.reserved += uint32(batch.memTableSize)
199 2 :
200 2 : m.writerRef()
201 2 : return nil
202 : }
203 :
204 2 : func (m *memTable) apply(batch *Batch, seqNum uint64) error {
205 2 : if seqNum < m.logSeqNum {
206 0 : return base.CorruptionErrorf("pebble: batch seqnum %d is less than memtable creation seqnum %d",
207 0 : errors.Safe(seqNum), errors.Safe(m.logSeqNum))
208 0 : }
209 :
210 2 : var ins arenaskl.Inserter
211 2 : var tombstoneCount, rangeKeyCount uint32
212 2 : startSeqNum := seqNum
213 2 : for r := batch.Reader(); ; seqNum++ {
214 2 : kind, ukey, value, ok, err := r.Next()
215 2 : if !ok {
216 2 : if err != nil {
217 0 : return err
218 0 : }
219 2 : break
220 : }
221 2 : ikey := base.MakeInternalKey(ukey, seqNum, kind)
222 2 : switch kind {
223 2 : case InternalKeyKindRangeDelete:
224 2 : err = m.rangeDelSkl.Add(ikey, value)
225 2 : tombstoneCount++
226 2 : case InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete:
227 2 : err = m.rangeKeySkl.Add(ikey, value)
228 2 : rangeKeyCount++
229 1 : case InternalKeyKindLogData:
230 1 : // Don't increment seqNum for LogData, since these are not applied
231 1 : // to the memtable.
232 1 : seqNum--
233 0 : case InternalKeyKindIngestSST:
234 0 : panic("pebble: cannot apply ingested sstable key kind to memtable")
235 2 : default:
236 2 : err = ins.Add(&m.skl, ikey, value)
237 : }
238 2 : if err != nil {
239 0 : return err
240 0 : }
241 : }
242 2 : if seqNum != startSeqNum+uint64(batch.Count()) {
243 0 : return base.CorruptionErrorf("pebble: inconsistent batch count: %d vs %d",
244 0 : errors.Safe(seqNum), errors.Safe(startSeqNum+uint64(batch.Count())))
245 0 : }
246 2 : if tombstoneCount != 0 {
247 2 : m.tombstones.invalidate(tombstoneCount)
248 2 : }
249 2 : if rangeKeyCount != 0 {
250 2 : m.rangeKeys.invalidate(rangeKeyCount)
251 2 : }
252 2 : return nil
253 : }
254 :
255 : // newIter is part of the flushable interface. It returns an iterator that is
256 : // unpositioned (Iterator.Valid() will return false). The iterator can be
257 : // positioned via a call to SeekGE, SeekLT, First or Last.
258 2 : func (m *memTable) newIter(o *IterOptions) internalIterator {
259 2 : return m.skl.NewIter(o.GetLowerBound(), o.GetUpperBound())
260 2 : }
261 :
262 : // newFlushIter is part of the flushable interface.
263 2 : func (m *memTable) newFlushIter(o *IterOptions, bytesFlushed *uint64) internalIterator {
264 2 : return m.skl.NewFlushIter(bytesFlushed)
265 2 : }
266 :
267 : // newRangeDelIter is part of the flushable interface.
268 2 : func (m *memTable) newRangeDelIter(*IterOptions) keyspan.FragmentIterator {
269 2 : tombstones := m.tombstones.get()
270 2 : if tombstones == nil {
271 2 : return nil
272 2 : }
273 2 : return keyspan.NewIter(m.cmp, tombstones)
274 : }
275 :
276 : // newRangeKeyIter is part of the flushable interface.
277 2 : func (m *memTable) newRangeKeyIter(*IterOptions) keyspan.FragmentIterator {
278 2 : rangeKeys := m.rangeKeys.get()
279 2 : if rangeKeys == nil {
280 2 : return nil
281 2 : }
282 2 : return keyspan.NewIter(m.cmp, rangeKeys)
283 : }
284 :
285 : // containsRangeKeys is part of the flushable interface.
286 2 : func (m *memTable) containsRangeKeys() bool {
287 2 : return m.rangeKeys.count.Load() > 0
288 2 : }
289 :
290 2 : func (m *memTable) availBytes() uint32 {
291 2 : a := m.skl.Arena()
292 2 : if m.writerRefs.Load() == 1 {
293 2 : // If there are no other concurrent apply operations, we can update the
294 2 : // reserved bytes setting to accurately reflect how many bytes of been
295 2 : // allocated vs the over-estimation present in memTableEntrySize.
296 2 : m.reserved = a.Size()
297 2 : }
298 2 : return a.Capacity() - m.reserved
299 : }
300 :
301 : // inuseBytes is part of the flushable interface.
302 2 : func (m *memTable) inuseBytes() uint64 {
303 2 : return uint64(m.skl.Size() - memTableEmptySize)
304 2 : }
305 :
306 : // totalBytes is part of the flushable interface.
307 2 : func (m *memTable) totalBytes() uint64 {
308 2 : return uint64(m.skl.Arena().Capacity())
309 2 : }
310 :
311 : // empty returns whether the MemTable has no key/value pairs.
312 1 : func (m *memTable) empty() bool {
313 1 : return m.skl.Size() == memTableEmptySize
314 1 : }
315 :
316 : // A keySpanFrags holds a set of fragmented keyspan.Spans with a particular key
317 : // kind at a particular moment for a memtable.
318 : //
319 : // When a new span of a particular kind is added to the memtable, it may overlap
320 : // with other spans of the same kind. Instead of performing the fragmentation
321 : // whenever an iterator requires it, fragments are cached within a keySpanCache
322 : // type. The keySpanCache uses keySpanFrags to hold the cached fragmented spans.
323 : //
324 : // The count of keys (and keys of any given kind) in a memtable only
325 : // monotonically increases. The count of key spans of a particular kind is used
326 : // as a stand-in for a 'sequence number'. A keySpanFrags represents the
327 : // fragmented state of the memtable's keys of a given kind at the moment while
328 : // there existed `count` keys of that kind in the memtable.
329 : //
330 : // It's currently only used to contain fragmented range deletion tombstones.
331 : type keySpanFrags struct {
332 : count uint32
333 : once sync.Once
334 : spans []keyspan.Span
335 : }
336 :
337 : type constructSpan func(ik base.InternalKey, v []byte, keysDst []keyspan.Key) (keyspan.Span, error)
338 :
339 : func rangeDelConstructSpan(
340 : ik base.InternalKey, v []byte, keysDst []keyspan.Key,
341 2 : ) (keyspan.Span, error) {
342 2 : return rangedel.Decode(ik, v, keysDst), nil
343 2 : }
344 :
345 : // get retrieves the fragmented spans, populating them if necessary. Note that
346 : // the populated span fragments may be built from more than f.count memTable
347 : // spans, but that is ok for correctness. All we're requiring is that the
348 : // memTable contains at least f.count keys of the configured kind. This
349 : // situation can occur if there are multiple concurrent additions of the key
350 : // kind and a concurrent reader. The reader can load a keySpanFrags and populate
351 : // it even though is has been invalidated (i.e. replaced with a newer
352 : // keySpanFrags).
353 : func (f *keySpanFrags) get(
354 : skl *arenaskl.Skiplist, cmp Compare, formatKey base.FormatKey, constructSpan constructSpan,
355 2 : ) []keyspan.Span {
356 2 : f.once.Do(func() {
357 2 : frag := &keyspan.Fragmenter{
358 2 : Cmp: cmp,
359 2 : Format: formatKey,
360 2 : Emit: func(fragmented keyspan.Span) {
361 2 : f.spans = append(f.spans, fragmented)
362 2 : },
363 : }
364 2 : it := skl.NewIter(nil, nil)
365 2 : var keysDst []keyspan.Key
366 2 : for key, val := it.First(); key != nil; key, val = it.Next() {
367 2 : s, err := constructSpan(*key, val.InPlaceValue(), keysDst)
368 2 : if err != nil {
369 0 : panic(err)
370 : }
371 2 : frag.Add(s)
372 2 : keysDst = s.Keys[len(s.Keys):]
373 : }
374 2 : frag.Finish()
375 : })
376 2 : return f.spans
377 : }
378 :
379 : // A keySpanCache is used to cache a set of fragmented spans. The cache is
380 : // invalidated whenever a key of the same kind is added to a memTable, and
381 : // populated when empty when a span iterator of that key kind is created.
382 : type keySpanCache struct {
383 : count atomic.Uint32
384 : frags atomic.Pointer[keySpanFrags]
385 : cmp Compare
386 : formatKey base.FormatKey
387 : constructSpan constructSpan
388 : skl *arenaskl.Skiplist
389 : }
390 :
391 : // Invalidate the current set of cached spans, indicating the number of
392 : // spans that were added.
393 2 : func (c *keySpanCache) invalidate(count uint32) {
394 2 : newCount := c.count.Add(count)
395 2 : var frags *keySpanFrags
396 2 :
397 2 : for {
398 2 : oldFrags := c.frags.Load()
399 2 : if oldFrags != nil && oldFrags.count >= newCount {
400 1 : // Someone else invalidated the cache before us and their invalidation
401 1 : // subsumes ours.
402 1 : break
403 : }
404 2 : if frags == nil {
405 2 : frags = &keySpanFrags{count: newCount}
406 2 : }
407 2 : if c.frags.CompareAndSwap(oldFrags, frags) {
408 2 : // We successfully invalidated the cache.
409 2 : break
410 : }
411 : // Someone else invalidated the cache. Loop and try again.
412 : }
413 : }
414 :
415 2 : func (c *keySpanCache) get() []keyspan.Span {
416 2 : frags := c.frags.Load()
417 2 : if frags == nil {
418 2 : return nil
419 2 : }
420 2 : return frags.get(c.skl, c.cmp, c.formatKey, c.constructSpan)
421 : }
|