Line data Source code
1 : // Copyright 2011 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "bytes"
9 : "fmt"
10 : "os"
11 : "sync"
12 : "sync/atomic"
13 :
14 : "github.com/cockroachdb/errors"
15 : "github.com/cockroachdb/pebble/internal/arenaskl"
16 : "github.com/cockroachdb/pebble/internal/base"
17 : "github.com/cockroachdb/pebble/internal/keyspan"
18 : "github.com/cockroachdb/pebble/internal/manual"
19 : "github.com/cockroachdb/pebble/internal/rangedel"
20 : "github.com/cockroachdb/pebble/internal/rangekey"
21 : )
22 :
23 1 : func memTableEntrySize(keyBytes, valueBytes int) uint64 {
24 1 : return arenaskl.MaxNodeSize(uint32(keyBytes)+8, uint32(valueBytes))
25 1 : }
26 :
27 : // memTableEmptySize is the amount of allocated space in the arena when the
28 : // memtable is empty.
29 1 : var memTableEmptySize = func() uint32 {
30 1 : var pointSkl arenaskl.Skiplist
31 1 : var rangeDelSkl arenaskl.Skiplist
32 1 : var rangeKeySkl arenaskl.Skiplist
33 1 : arena := arenaskl.NewArena(make([]byte, 16<<10 /* 16 KB */))
34 1 : pointSkl.Reset(arena, bytes.Compare)
35 1 : rangeDelSkl.Reset(arena, bytes.Compare)
36 1 : rangeKeySkl.Reset(arena, bytes.Compare)
37 1 : return arena.Size()
38 1 : }()
39 :
40 : // A memTable implements an in-memory layer of the LSM. A memTable is mutable,
41 : // but append-only. Records are added, but never removed. Deletion is supported
42 : // via tombstones, but it is up to higher level code (see Iterator) to support
43 : // processing those tombstones.
44 : //
45 : // A memTable is implemented on top of a lock-free arena-backed skiplist. An
46 : // arena is a fixed size contiguous chunk of memory (see
47 : // Options.MemTableSize). A memTable's memory consumption is thus fixed at the
48 : // time of creation (with the exception of the cached fragmented range
49 : // tombstones). The arena-backed skiplist provides both forward and reverse
50 : // links which makes forward and reverse iteration the same speed.
51 : //
52 : // A batch is "applied" to a memTable in a two step process: prepare(batch) ->
53 : // apply(batch). memTable.prepare() is not thread-safe and must be called with
54 : // external synchronization. Preparation reserves space in the memTable for the
55 : // batch. Note that we pessimistically compute how much space a batch will
56 : // consume in the memTable (see memTableEntrySize and
57 : // Batch.memTableSize). Preparation is an O(1) operation. Applying a batch to
58 : // the memTable can be performed concurrently with other apply
59 : // operations. Applying a batch is an O(n logm) operation where N is the number
60 : // of records in the batch and M is the number of records in the memtable. The
61 : // commitPipeline serializes batch preparation, and allows batch application to
62 : // proceed concurrently.
63 : //
64 : // It is safe to call get, apply, newIter, and newRangeDelIter concurrently.
65 : type memTable struct {
66 : cmp Compare
67 : formatKey base.FormatKey
68 : equal Equal
69 : arenaBuf []byte
70 : skl arenaskl.Skiplist
71 : rangeDelSkl arenaskl.Skiplist
72 : rangeKeySkl arenaskl.Skiplist
73 : // reserved tracks the amount of space used by the memtable, both by actual
74 : // data stored in the memtable as well as inflight batch commit
75 : // operations. This value is incremented pessimistically by prepare() in
76 : // order to account for the space needed by a batch.
77 : reserved uint32
78 : // writerRefs tracks the write references on the memtable. The two sources of
79 : // writer references are the memtable being on DB.mu.mem.queue and from
80 : // inflight mutations that have reserved space in the memtable but not yet
81 : // applied. The memtable cannot be flushed to disk until the writer refs
82 : // drops to zero.
83 : writerRefs atomic.Int32
84 : tombstones keySpanCache
85 : rangeKeys keySpanCache
86 : // The current logSeqNum at the time the memtable was created. This is
87 : // guaranteed to be less than or equal to any seqnum stored in the memtable.
88 : logSeqNum uint64
89 : releaseAccountingReservation func()
90 : }
91 :
92 1 : func (m *memTable) free() {
93 1 : if m != nil {
94 1 : m.releaseAccountingReservation()
95 1 : manual.Free(m.arenaBuf)
96 1 : m.arenaBuf = nil
97 1 : }
98 : }
99 :
100 : // memTableOptions holds configuration used when creating a memTable. All of
101 : // the fields are optional and will be filled with defaults if not specified
102 : // which is used by tests.
103 : type memTableOptions struct {
104 : *Options
105 : arenaBuf []byte
106 : size int
107 : logSeqNum uint64
108 : releaseAccountingReservation func()
109 : }
110 :
111 1 : func checkMemTable(obj interface{}) {
112 1 : m := obj.(*memTable)
113 1 : if m.arenaBuf != nil {
114 0 : fmt.Fprintf(os.Stderr, "%p: memTable buffer was not freed\n", m.arenaBuf)
115 0 : os.Exit(1)
116 0 : }
117 : }
118 :
119 : // newMemTable returns a new MemTable of the specified size. If size is zero,
120 : // Options.MemTableSize is used instead.
121 1 : func newMemTable(opts memTableOptions) *memTable {
122 1 : opts.Options = opts.Options.EnsureDefaults()
123 1 : m := new(memTable)
124 1 : m.init(opts)
125 1 : return m
126 1 : }
127 :
128 1 : func (m *memTable) init(opts memTableOptions) {
129 1 : if opts.size == 0 {
130 1 : opts.size = int(opts.MemTableSize)
131 1 : }
132 1 : *m = memTable{
133 1 : cmp: opts.Comparer.Compare,
134 1 : formatKey: opts.Comparer.FormatKey,
135 1 : equal: opts.Comparer.Equal,
136 1 : arenaBuf: opts.arenaBuf,
137 1 : logSeqNum: opts.logSeqNum,
138 1 : releaseAccountingReservation: opts.releaseAccountingReservation,
139 1 : }
140 1 : m.writerRefs.Store(1)
141 1 : m.tombstones = keySpanCache{
142 1 : cmp: m.cmp,
143 1 : formatKey: m.formatKey,
144 1 : skl: &m.rangeDelSkl,
145 1 : constructSpan: rangeDelConstructSpan,
146 1 : }
147 1 : m.rangeKeys = keySpanCache{
148 1 : cmp: m.cmp,
149 1 : formatKey: m.formatKey,
150 1 : skl: &m.rangeKeySkl,
151 1 : constructSpan: rangekey.Decode,
152 1 : }
153 1 :
154 1 : if m.arenaBuf == nil {
155 1 : m.arenaBuf = make([]byte, opts.size)
156 1 : }
157 :
158 1 : arena := arenaskl.NewArena(m.arenaBuf)
159 1 : m.skl.Reset(arena, m.cmp)
160 1 : m.rangeDelSkl.Reset(arena, m.cmp)
161 1 : m.rangeKeySkl.Reset(arena, m.cmp)
162 1 : m.reserved = arena.Size()
163 : }
164 :
165 1 : func (m *memTable) writerRef() {
166 1 : switch v := m.writerRefs.Add(1); {
167 0 : case v <= 1:
168 0 : panic(fmt.Sprintf("pebble: inconsistent reference count: %d", v))
169 : }
170 : }
171 :
172 : // writerUnref drops a ref on the memtable. Returns true if this was the last ref.
173 1 : func (m *memTable) writerUnref() (wasLastRef bool) {
174 1 : switch v := m.writerRefs.Add(-1); {
175 0 : case v < 0:
176 0 : panic(fmt.Sprintf("pebble: inconsistent reference count: %d", v))
177 1 : case v == 0:
178 1 : return true
179 1 : default:
180 1 : return false
181 : }
182 : }
183 :
184 : // readyForFlush is part of the flushable interface.
185 1 : func (m *memTable) readyForFlush() bool {
186 1 : return m.writerRefs.Load() == 0
187 1 : }
188 :
189 : // Prepare reserves space for the batch in the memtable and references the
190 : // memtable preventing it from being flushed until the batch is applied. Note
191 : // that prepare is not thread-safe, while apply is. The caller must call
192 : // writerUnref() after the batch has been applied.
193 1 : func (m *memTable) prepare(batch *Batch) error {
194 1 : avail := m.availBytes()
195 1 : if batch.memTableSize > uint64(avail) {
196 1 : return arenaskl.ErrArenaFull
197 1 : }
198 1 : m.reserved += uint32(batch.memTableSize)
199 1 :
200 1 : m.writerRef()
201 1 : return nil
202 : }
203 :
204 1 : func (m *memTable) apply(batch *Batch, seqNum uint64) error {
205 1 : if seqNum < m.logSeqNum {
206 0 : return base.CorruptionErrorf("pebble: batch seqnum %d is less than memtable creation seqnum %d",
207 0 : errors.Safe(seqNum), errors.Safe(m.logSeqNum))
208 0 : }
209 :
210 1 : var ins arenaskl.Inserter
211 1 : var tombstoneCount, rangeKeyCount uint32
212 1 : startSeqNum := seqNum
213 1 : for r := batch.Reader(); ; seqNum++ {
214 1 : kind, ukey, value, ok := r.Next()
215 1 : if !ok {
216 1 : break
217 : }
218 1 : var err error
219 1 : ikey := base.MakeInternalKey(ukey, seqNum, kind)
220 1 : switch kind {
221 1 : case InternalKeyKindRangeDelete:
222 1 : err = m.rangeDelSkl.Add(ikey, value)
223 1 : tombstoneCount++
224 1 : case InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete:
225 1 : err = m.rangeKeySkl.Add(ikey, value)
226 1 : rangeKeyCount++
227 1 : case InternalKeyKindLogData:
228 1 : // Don't increment seqNum for LogData, since these are not applied
229 1 : // to the memtable.
230 1 : seqNum--
231 0 : case InternalKeyKindIngestSST:
232 0 : panic("pebble: cannot apply ingested sstable key kind to memtable")
233 1 : default:
234 1 : err = ins.Add(&m.skl, ikey, value)
235 : }
236 1 : if err != nil {
237 0 : return err
238 0 : }
239 : }
240 1 : if seqNum != startSeqNum+uint64(batch.Count()) {
241 0 : return base.CorruptionErrorf("pebble: inconsistent batch count: %d vs %d",
242 0 : errors.Safe(seqNum), errors.Safe(startSeqNum+uint64(batch.Count())))
243 0 : }
244 1 : if tombstoneCount != 0 {
245 1 : m.tombstones.invalidate(tombstoneCount)
246 1 : }
247 1 : if rangeKeyCount != 0 {
248 1 : m.rangeKeys.invalidate(rangeKeyCount)
249 1 : }
250 1 : return nil
251 : }
252 :
253 : // newIter is part of the flushable interface. It returns an iterator that is
254 : // unpositioned (Iterator.Valid() will return false). The iterator can be
255 : // positioned via a call to SeekGE, SeekLT, First or Last.
256 1 : func (m *memTable) newIter(o *IterOptions) internalIterator {
257 1 : return m.skl.NewIter(o.GetLowerBound(), o.GetUpperBound())
258 1 : }
259 :
260 : // newFlushIter is part of the flushable interface.
261 1 : func (m *memTable) newFlushIter(o *IterOptions, bytesFlushed *uint64) internalIterator {
262 1 : return m.skl.NewFlushIter(bytesFlushed)
263 1 : }
264 :
265 : // newRangeDelIter is part of the flushable interface.
266 1 : func (m *memTable) newRangeDelIter(*IterOptions) keyspan.FragmentIterator {
267 1 : tombstones := m.tombstones.get()
268 1 : if tombstones == nil {
269 1 : return nil
270 1 : }
271 1 : return keyspan.NewIter(m.cmp, tombstones)
272 : }
273 :
274 : // newRangeKeyIter is part of the flushable interface.
275 1 : func (m *memTable) newRangeKeyIter(*IterOptions) keyspan.FragmentIterator {
276 1 : rangeKeys := m.rangeKeys.get()
277 1 : if rangeKeys == nil {
278 1 : return nil
279 1 : }
280 1 : return keyspan.NewIter(m.cmp, rangeKeys)
281 : }
282 :
283 : // containsRangeKeys is part of the flushable interface.
284 1 : func (m *memTable) containsRangeKeys() bool {
285 1 : return m.rangeKeys.count.Load() > 0
286 1 : }
287 :
288 1 : func (m *memTable) availBytes() uint32 {
289 1 : a := m.skl.Arena()
290 1 : if m.writerRefs.Load() == 1 {
291 1 : // If there are no other concurrent apply operations, we can update the
292 1 : // reserved bytes setting to accurately reflect how many bytes of been
293 1 : // allocated vs the over-estimation present in memTableEntrySize.
294 1 : m.reserved = a.Size()
295 1 : }
296 1 : return a.Capacity() - m.reserved
297 : }
298 :
299 : // inuseBytes is part of the flushable interface.
300 1 : func (m *memTable) inuseBytes() uint64 {
301 1 : return uint64(m.skl.Size() - memTableEmptySize)
302 1 : }
303 :
304 : // totalBytes is part of the flushable interface.
305 1 : func (m *memTable) totalBytes() uint64 {
306 1 : return uint64(m.skl.Arena().Capacity())
307 1 : }
308 :
309 : // empty returns whether the MemTable has no key/value pairs.
310 1 : func (m *memTable) empty() bool {
311 1 : return m.skl.Size() == memTableEmptySize
312 1 : }
313 :
314 : // A keySpanFrags holds a set of fragmented keyspan.Spans with a particular key
315 : // kind at a particular moment for a memtable.
316 : //
317 : // When a new span of a particular kind is added to the memtable, it may overlap
318 : // with other spans of the same kind. Instead of performing the fragmentation
319 : // whenever an iterator requires it, fragments are cached within a keySpanCache
320 : // type. The keySpanCache uses keySpanFrags to hold the cached fragmented spans.
321 : //
322 : // The count of keys (and keys of any given kind) in a memtable only
323 : // monotonically increases. The count of key spans of a particular kind is used
324 : // as a stand-in for a 'sequence number'. A keySpanFrags represents the
325 : // fragmented state of the memtable's keys of a given kind at the moment while
326 : // there existed `count` keys of that kind in the memtable.
327 : //
328 : // It's currently only used to contain fragmented range deletion tombstones.
329 : type keySpanFrags struct {
330 : count uint32
331 : once sync.Once
332 : spans []keyspan.Span
333 : }
334 :
335 : type constructSpan func(ik base.InternalKey, v []byte, keysDst []keyspan.Key) (keyspan.Span, error)
336 :
337 : func rangeDelConstructSpan(
338 : ik base.InternalKey, v []byte, keysDst []keyspan.Key,
339 1 : ) (keyspan.Span, error) {
340 1 : return rangedel.Decode(ik, v, keysDst), nil
341 1 : }
342 :
343 : // get retrieves the fragmented spans, populating them if necessary. Note that
344 : // the populated span fragments may be built from more than f.count memTable
345 : // spans, but that is ok for correctness. All we're requiring is that the
346 : // memTable contains at least f.count keys of the configured kind. This
347 : // situation can occur if there are multiple concurrent additions of the key
348 : // kind and a concurrent reader. The reader can load a keySpanFrags and populate
349 : // it even though is has been invalidated (i.e. replaced with a newer
350 : // keySpanFrags).
351 : func (f *keySpanFrags) get(
352 : skl *arenaskl.Skiplist, cmp Compare, formatKey base.FormatKey, constructSpan constructSpan,
353 1 : ) []keyspan.Span {
354 1 : f.once.Do(func() {
355 1 : frag := &keyspan.Fragmenter{
356 1 : Cmp: cmp,
357 1 : Format: formatKey,
358 1 : Emit: func(fragmented keyspan.Span) {
359 1 : f.spans = append(f.spans, fragmented)
360 1 : },
361 : }
362 1 : it := skl.NewIter(nil, nil)
363 1 : var keysDst []keyspan.Key
364 1 : for key, val := it.First(); key != nil; key, val = it.Next() {
365 1 : s, err := constructSpan(*key, val.InPlaceValue(), keysDst)
366 1 : if err != nil {
367 0 : panic(err)
368 : }
369 1 : frag.Add(s)
370 1 : keysDst = s.Keys[len(s.Keys):]
371 : }
372 1 : frag.Finish()
373 : })
374 1 : return f.spans
375 : }
376 :
377 : // A keySpanCache is used to cache a set of fragmented spans. The cache is
378 : // invalidated whenever a key of the same kind is added to a memTable, and
379 : // populated when empty when a span iterator of that key kind is created.
380 : type keySpanCache struct {
381 : count atomic.Uint32
382 : frags atomic.Pointer[keySpanFrags]
383 : cmp Compare
384 : formatKey base.FormatKey
385 : constructSpan constructSpan
386 : skl *arenaskl.Skiplist
387 : }
388 :
389 : // Invalidate the current set of cached spans, indicating the number of
390 : // spans that were added.
391 1 : func (c *keySpanCache) invalidate(count uint32) {
392 1 : newCount := c.count.Add(count)
393 1 : var frags *keySpanFrags
394 1 :
395 1 : for {
396 1 : oldFrags := c.frags.Load()
397 1 : if oldFrags != nil && oldFrags.count >= newCount {
398 0 : // Someone else invalidated the cache before us and their invalidation
399 0 : // subsumes ours.
400 0 : break
401 : }
402 1 : if frags == nil {
403 1 : frags = &keySpanFrags{count: newCount}
404 1 : }
405 1 : if c.frags.CompareAndSwap(oldFrags, frags) {
406 1 : // We successfully invalidated the cache.
407 1 : break
408 : }
409 : // Someone else invalidated the cache. Loop and try again.
410 : }
411 : }
412 :
413 1 : func (c *keySpanCache) get() []keyspan.Span {
414 1 : frags := c.frags.Load()
415 1 : if frags == nil {
416 1 : return nil
417 1 : }
418 1 : return frags.get(c.skl, c.cmp, c.formatKey, c.constructSpan)
419 : }
|