Line data Source code
1 : // Copyright 2018 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "runtime"
9 : "sync"
10 : "sync/atomic"
11 : "time"
12 :
13 : "github.com/cockroachdb/pebble/record"
14 : )
15 :
16 : // commitQueue is a lock-free fixed-size single-producer, multi-consumer
17 : // queue. The single producer can enqueue (push) to the head, and consumers can
18 : // dequeue (pop) from the tail.
19 : //
20 : // It has the added feature that it nils out unused slots to avoid unnecessary
21 : // retention of objects.
22 : type commitQueue struct {
23 : // headTail packs together a 32-bit head index and a 32-bit tail index. Both
24 : // are indexes into slots modulo len(slots)-1.
25 : //
26 : // tail = index of oldest data in queue
27 : // head = index of next slot to fill
28 : //
29 : // Slots in the range [tail, head) are owned by consumers. A consumer
30 : // continues to own a slot outside this range until it nils the slot, at
31 : // which point ownership passes to the producer.
32 : //
33 : // The head index is stored in the most-significant bits so that we can
34 : // atomically add to it and the overflow is harmless.
35 : headTail atomic.Uint64
36 :
37 : // slots is a ring buffer of values stored in this queue. The size must be a
38 : // power of 2. A slot is in use until *both* the tail index has moved beyond
39 : // it and the slot value has been set to nil. The slot value is set to nil
40 : // atomically by the consumer and read atomically by the producer.
41 : slots [record.SyncConcurrency]atomic.Pointer[Batch]
42 : }
43 :
44 : const dequeueBits = 32
45 :
46 2 : func (q *commitQueue) unpack(ptrs uint64) (head, tail uint32) {
47 2 : const mask = 1<<dequeueBits - 1
48 2 : head = uint32((ptrs >> dequeueBits) & mask)
49 2 : tail = uint32(ptrs & mask)
50 2 : return
51 2 : }
52 :
53 2 : func (q *commitQueue) pack(head, tail uint32) uint64 {
54 2 : const mask = 1<<dequeueBits - 1
55 2 : return (uint64(head) << dequeueBits) |
56 2 : uint64(tail&mask)
57 2 : }
58 :
59 2 : func (q *commitQueue) enqueue(b *Batch) {
60 2 : ptrs := q.headTail.Load()
61 2 : head, tail := q.unpack(ptrs)
62 2 : if (tail+uint32(len(q.slots)))&(1<<dequeueBits-1) == head {
63 0 : // Queue is full. This should never be reached because commitPipeline.commitQueueSem
64 0 : // limits the number of concurrent operations.
65 0 : panic("pebble: not reached")
66 : }
67 2 : slot := &q.slots[head&uint32(len(q.slots)-1)]
68 2 :
69 2 : // Check if the head slot has been released by dequeueApplied.
70 2 : for slot.Load() != nil {
71 0 : // Another goroutine is still cleaning up the tail, so the queue is
72 0 : // actually still full. We spin because this should resolve itself
73 0 : // momentarily.
74 0 : runtime.Gosched()
75 0 : }
76 :
77 : // The head slot is free, so we own it.
78 2 : slot.Store(b)
79 2 :
80 2 : // Increment head. This passes ownership of slot to dequeueApplied and acts as a
81 2 : // store barrier for writing the slot.
82 2 : q.headTail.Add(1 << dequeueBits)
83 : }
84 :
85 : // dequeueApplied removes the earliest enqueued Batch, if it is applied.
86 : //
87 : // Returns nil if the commit queue is empty or the earliest Batch is not yet
88 : // applied.
89 2 : func (q *commitQueue) dequeueApplied() *Batch {
90 2 : for {
91 2 : ptrs := q.headTail.Load()
92 2 : head, tail := q.unpack(ptrs)
93 2 : if tail == head {
94 2 : // Queue is empty.
95 2 : return nil
96 2 : }
97 :
98 2 : slot := &q.slots[tail&uint32(len(q.slots)-1)]
99 2 : b := slot.Load()
100 2 : if b == nil || !b.applied.Load() {
101 1 : // The batch is not ready to be dequeued, or another goroutine has
102 1 : // already dequeued it.
103 1 : return nil
104 1 : }
105 :
106 : // Confirm head and tail (for our speculative check above) and increment
107 : // tail. If this succeeds, then we own the slot at tail.
108 2 : ptrs2 := q.pack(head, tail+1)
109 2 : if q.headTail.CompareAndSwap(ptrs, ptrs2) {
110 2 : // We now own slot.
111 2 : //
112 2 : // Tell enqueue that we're done with this slot. Zeroing the slot is also
113 2 : // important so we don't leave behind references that could keep this object
114 2 : // live longer than necessary.
115 2 : slot.Store(nil)
116 2 : // At this point enqueue owns the slot.
117 2 : return b
118 2 : }
119 : }
120 : }
121 :
122 : // commitEnv contains the environment that a commitPipeline interacts
123 : // with. This allows fine-grained testing of commitPipeline behavior without
124 : // construction of an entire DB.
125 : type commitEnv struct {
126 : // The next sequence number to give to a batch. Protected by
127 : // commitPipeline.mu.
128 : logSeqNum *atomic.Uint64
129 : // The visible sequence number at which reads should be performed. Ratcheted
130 : // upwards atomically as batches are applied to the memtable.
131 : visibleSeqNum *atomic.Uint64
132 :
133 : // Apply the batch to the specified memtable. Called concurrently.
134 : apply func(b *Batch, mem *memTable) error
135 : // Write the batch to the WAL. If wg != nil, the data will be persisted
136 : // asynchronously and done will be called on wg upon completion. If wg != nil
137 : // and err != nil, a failure to persist the WAL will populate *err. Returns
138 : // the memtable the batch should be applied to. Serial execution enforced by
139 : // commitPipeline.mu.
140 : write func(b *Batch, wg *sync.WaitGroup, err *error) (*memTable, error)
141 : }
142 :
143 : // A commitPipeline manages the stages of committing a set of mutations
144 : // (contained in a single Batch) atomically to the DB. The steps are
145 : // conceptually:
146 : //
147 : // 1. Write the batch to the WAL and optionally sync the WAL
148 : // 2. Apply the mutations in the batch to the memtable
149 : //
150 : // These two simple steps are made complicated by the desire for high
151 : // performance. In the absence of concurrency, performance is limited by how
152 : // fast a batch can be written (and synced) to the WAL and then added to the
153 : // memtable, both of which are outside the purview of the commit
154 : // pipeline. Performance under concurrency is the primary concern of the commit
155 : // pipeline, though it also needs to maintain two invariants:
156 : //
157 : // 1. Batches need to be written to the WAL in sequence number order.
158 : // 2. Batches need to be made visible for reads in sequence number order. This
159 : // invariant arises from the use of a single sequence number which
160 : // indicates which mutations are visible.
161 : //
162 : // Taking these invariants into account, let's revisit the work the commit
163 : // pipeline needs to perform. Writing the batch to the WAL is necessarily
164 : // serialized as there is a single WAL object. The order of the entries in the
165 : // WAL defines the sequence number order. Note that writing to the WAL is
166 : // extremely fast, usually just a memory copy. Applying the mutations in a
167 : // batch to the memtable can occur concurrently as the underlying skiplist
168 : // supports concurrent insertions. Publishing the visible sequence number is
169 : // another serialization point, but one with a twist: the visible sequence
170 : // number cannot be bumped until the mutations for earlier batches have
171 : // finished applying to the memtable (the visible sequence number only ratchets
172 : // up). Lastly, if requested, the commit waits for the WAL to sync. Note that
173 : // waiting for the WAL sync after ratcheting the visible sequence number allows
174 : // another goroutine to read committed data before the WAL has synced. This is
175 : // similar behavior to RocksDB's manual WAL flush functionality. Application
176 : // code needs to protect against this if necessary.
177 : //
178 : // The full outline of the commit pipeline operation is as follows:
179 : //
180 : // with commitPipeline mutex locked:
181 : // assign batch sequence number
182 : // write batch to WAL
183 : // (optionally) add batch to WAL sync list
184 : // apply batch to memtable (concurrently)
185 : // wait for earlier batches to apply
186 : // ratchet read sequence number
187 : // (optionally) wait for the WAL to sync
188 : //
189 : // As soon as a batch has been written to the WAL, the commitPipeline mutex is
190 : // released allowing another batch to write to the WAL. Each commit operation
191 : // individually applies its batch to the memtable providing concurrency. The
192 : // WAL sync happens concurrently with applying to the memtable (see
193 : // commitPipeline.syncLoop).
194 : //
195 : // The "waits for earlier batches to apply" work is more complicated than might
196 : // be expected. The obvious approach would be to keep a queue of pending
197 : // batches and for each batch to wait for the previous batch to finish
198 : // committing. This approach was tried initially and turned out to be too
199 : // slow. The problem is that it causes excessive goroutine activity as each
200 : // committing goroutine needs to wake up in order for the next goroutine to be
201 : // unblocked. The approach taken in the current code is conceptually similar,
202 : // though it avoids waking a goroutine to perform work that another goroutine
203 : // can perform. A commitQueue (a single-producer, multiple-consumer queue)
204 : // holds the ordered list of committing batches. Addition to the queue is done
205 : // while holding commitPipeline.mutex ensuring the same ordering of batches in
206 : // the queue as the ordering in the WAL. When a batch finishes applying to the
207 : // memtable, it atomically updates its Batch.applied field. Ratcheting of the
208 : // visible sequence number is done by commitPipeline.publish which loops
209 : // dequeueing "applied" batches and ratcheting the visible sequence number. If
210 : // we hit an unapplied batch at the head of the queue we can block as we know
211 : // that committing of that unapplied batch will eventually find our (applied)
212 : // batch in the queue. See commitPipeline.publish for additional commentary.
213 : type commitPipeline struct {
214 : // WARNING: The following struct `commitQueue` contains fields which will
215 : // be accessed atomically.
216 : //
217 : // Go allocations are guaranteed to be 64-bit aligned which we take advantage
218 : // of by placing the 64-bit fields which we access atomically at the beginning
219 : // of the commitPipeline struct.
220 : // For more information, see https://golang.org/pkg/sync/atomic/#pkg-note-BUG.
221 : // Queue of pending batches to commit.
222 : pending commitQueue
223 : env commitEnv
224 : // The commit path has two queues:
225 : // - commitPipeline.pending contains batches whose seqnums have not yet been
226 : // published. It is a lock-free single producer multi consumer queue.
227 : // - LogWriter.flusher.syncQ contains state for batches that have asked for
228 : // a sync. It is a lock-free single producer single consumer queue.
229 : // These lock-free queues have a fixed capacity. And since they are
230 : // lock-free, we cannot do blocking waits when pushing onto these queues, in
231 : // case they are full. Additionally, adding to these queues happens while
232 : // holding commitPipeline.mu, and we don't want to block while holding that
233 : // mutex since it is also needed by other code.
234 : //
235 : // Popping from these queues is independent and for a particular batch can
236 : // occur in either order, though it is more common that popping from the
237 : // commitPipeline.pending will happen first.
238 : //
239 : // Due to these constraints, we reserve a unit of space in each queue before
240 : // acquiring commitPipeline.mu, which also ensures that the push operation
241 : // is guaranteed to have space in the queue. The commitQueueSem and
242 : // logSyncQSem are used for this reservation.
243 : commitQueueSem chan struct{}
244 : logSyncQSem chan struct{}
245 : ingestSem chan struct{}
246 : // The mutex to use for synchronizing access to logSeqNum and serializing
247 : // calls to commitEnv.write().
248 : mu sync.Mutex
249 : }
250 :
251 2 : func newCommitPipeline(env commitEnv) *commitPipeline {
252 2 : p := &commitPipeline{
253 2 : env: env,
254 2 : // The capacity of both commitQueue.slots and syncQueue.slots is set to
255 2 : // record.SyncConcurrency, which also determines the value of these
256 2 : // semaphores. We used to have a single semaphore, which required that the
257 2 : // capacity of these queues be the same. Now that we have two semaphores,
258 2 : // the capacity of these queues could be changed to be different. Say half
259 2 : // of the batches asked to be synced, but syncing took 5x the latency of
260 2 : // adding to the memtable and publishing. Then syncQueue.slots could be
261 2 : // sized as 0.5*5 of the commitQueue.slots. We can explore this if we find
262 2 : // that LogWriterMetrics.SyncQueueLen has high utilization under some
263 2 : // workloads.
264 2 : //
265 2 : // NB: the commit concurrency is one less than SyncConcurrency because we
266 2 : // have to allow one "slot" for a concurrent WAL rotation which will close
267 2 : // and sync the WAL.
268 2 : commitQueueSem: make(chan struct{}, record.SyncConcurrency-1),
269 2 : logSyncQSem: make(chan struct{}, record.SyncConcurrency-1),
270 2 : ingestSem: make(chan struct{}, 1),
271 2 : }
272 2 : return p
273 2 : }
274 :
275 : // directWrite is used to directly write to the WAL. commitPipeline.mu must be
276 : // held while this is called. DB.mu must not be held. directWrite will only
277 : // return once the WAL sync is complete. Note that DirectWrite is a special case
278 : // function which is currently only used when ingesting sstables as a flushable.
279 : // Reason carefully about the correctness argument when calling this function
280 : // from any context.
281 2 : func (p *commitPipeline) directWrite(b *Batch) error {
282 2 : var syncWG sync.WaitGroup
283 2 : var syncErr error
284 2 : syncWG.Add(1)
285 2 : p.logSyncQSem <- struct{}{}
286 2 : _, err := p.env.write(b, &syncWG, &syncErr)
287 2 : syncWG.Wait()
288 2 : err = firstError(err, syncErr)
289 2 : return err
290 2 : }
291 :
292 : // Commit the specified batch, writing it to the WAL, optionally syncing the
293 : // WAL, and applying the batch to the memtable. Upon successful return the
294 : // batch's mutations will be visible for reading.
295 : // REQUIRES: noSyncWait => syncWAL
296 2 : func (p *commitPipeline) Commit(b *Batch, syncWAL bool, noSyncWait bool) error {
297 2 : if b.Empty() {
298 2 : return nil
299 2 : }
300 :
301 2 : commitStartTime := time.Now()
302 2 : // Acquire semaphores.
303 2 : p.commitQueueSem <- struct{}{}
304 2 : if syncWAL {
305 2 : p.logSyncQSem <- struct{}{}
306 2 : }
307 2 : b.commitStats.SemaphoreWaitDuration = time.Since(commitStartTime)
308 2 :
309 2 : // Prepare the batch for committing: enqueuing the batch in the pending
310 2 : // queue, determining the batch sequence number and writing the data to the
311 2 : // WAL.
312 2 : //
313 2 : // NB: We set Batch.commitErr on error so that the batch won't be a candidate
314 2 : // for reuse. See Batch.release().
315 2 : mem, err := p.prepare(b, syncWAL, noSyncWait)
316 2 : if err != nil {
317 0 : b.db = nil // prevent batch reuse on error
318 0 : // NB: we are not doing <-p.commitQueueSem since the batch is still
319 0 : // sitting in the pending queue. We should consider fixing this by also
320 0 : // removing the batch from the pending queue.
321 0 : return err
322 0 : }
323 :
324 : // Apply the batch to the memtable.
325 2 : if err := p.env.apply(b, mem); err != nil {
326 0 : b.db = nil // prevent batch reuse on error
327 0 : // NB: we are not doing <-p.commitQueueSem since the batch is still
328 0 : // sitting in the pending queue. We should consider fixing this by also
329 0 : // removing the batch from the pending queue.
330 0 : return err
331 0 : }
332 :
333 : // Publish the batch sequence number.
334 2 : p.publish(b)
335 2 :
336 2 : <-p.commitQueueSem
337 2 :
338 2 : if !noSyncWait {
339 2 : // Already waited for commit, so look at the error.
340 2 : if b.commitErr != nil {
341 0 : b.db = nil // prevent batch reuse on error
342 0 : err = b.commitErr
343 0 : }
344 : }
345 : // Else noSyncWait. The LogWriter can be concurrently writing to
346 : // b.commitErr. We will read b.commitErr in Batch.SyncWait after the
347 : // LogWriter is done writing.
348 :
349 2 : b.commitStats.TotalDuration = time.Since(commitStartTime)
350 2 :
351 2 : return err
352 : }
353 :
354 : // AllocateSeqNum allocates count sequence numbers, invokes the prepare
355 : // callback, then the apply callback, and then publishes the sequence
356 : // numbers. AllocateSeqNum does not write to the WAL or add entries to the
357 : // memtable. AllocateSeqNum can be used to sequence an operation such as
358 : // sstable ingestion within the commit pipeline. The prepare callback is
359 : // invoked with commitPipeline.mu held, but note that DB.mu is not held and
360 : // must be locked if necessary.
361 : func (p *commitPipeline) AllocateSeqNum(
362 : count int, prepare func(seqNum uint64), apply func(seqNum uint64),
363 2 : ) {
364 2 : // This method is similar to Commit and prepare. Be careful about trying to
365 2 : // share additional code with those methods because Commit and prepare are
366 2 : // performance critical code paths.
367 2 :
368 2 : b := newBatch(nil)
369 2 : defer b.release()
370 2 :
371 2 : // Give the batch a count of 1 so that the log and visible sequence number
372 2 : // are incremented correctly.
373 2 : b.data = make([]byte, batchHeaderLen)
374 2 : b.setCount(uint32(count))
375 2 : b.commit.Add(1)
376 2 :
377 2 : p.commitQueueSem <- struct{}{}
378 2 :
379 2 : p.mu.Lock()
380 2 :
381 2 : // Enqueue the batch in the pending queue. Note that while the pending queue
382 2 : // is lock-free, we want the order of batches to be the same as the sequence
383 2 : // number order.
384 2 : p.pending.enqueue(b)
385 2 :
386 2 : // Assign the batch a sequence number. Note that we use atomic operations
387 2 : // here to handle concurrent reads of logSeqNum. commitPipeline.mu provides
388 2 : // mutual exclusion for other goroutines writing to logSeqNum.
389 2 : logSeqNum := p.env.logSeqNum.Add(uint64(count)) - uint64(count)
390 2 : seqNum := logSeqNum
391 2 : if seqNum == 0 {
392 1 : // We can't use the value 0 for the global seqnum during ingestion, because
393 1 : // 0 indicates no global seqnum. So allocate one more seqnum.
394 1 : p.env.logSeqNum.Add(1)
395 1 : seqNum++
396 1 : }
397 2 : b.setSeqNum(seqNum)
398 2 :
399 2 : // Wait for any outstanding writes to the memtable to complete. This is
400 2 : // necessary for ingestion so that the check for memtable overlap can see any
401 2 : // writes that were sequenced before the ingestion. The spin loop is
402 2 : // unfortunate, but obviates the need for additional synchronization.
403 2 : for {
404 2 : visibleSeqNum := p.env.visibleSeqNum.Load()
405 2 : if visibleSeqNum == logSeqNum {
406 2 : break
407 : }
408 1 : runtime.Gosched()
409 : }
410 :
411 : // Invoke the prepare callback. Note the lack of error reporting. Even if the
412 : // callback internally fails, the sequence number needs to be published in
413 : // order to allow the commit pipeline to proceed.
414 2 : prepare(b.SeqNum())
415 2 :
416 2 : p.mu.Unlock()
417 2 :
418 2 : // Invoke the apply callback.
419 2 : apply(b.SeqNum())
420 2 :
421 2 : // Publish the sequence number.
422 2 : p.publish(b)
423 2 :
424 2 : <-p.commitQueueSem
425 : }
426 :
427 2 : func (p *commitPipeline) prepare(b *Batch, syncWAL bool, noSyncWait bool) (*memTable, error) {
428 2 : n := uint64(b.Count())
429 2 : if n == invalidBatchCount {
430 0 : return nil, ErrInvalidBatch
431 0 : }
432 2 : var syncWG *sync.WaitGroup
433 2 : var syncErr *error
434 2 : switch {
435 2 : case !syncWAL:
436 2 : // Only need to wait for the publish.
437 2 : b.commit.Add(1)
438 : // Remaining cases represent syncWAL=true.
439 2 : case noSyncWait:
440 2 : syncErr = &b.commitErr
441 2 : syncWG = &b.fsyncWait
442 2 : // Only need to wait synchronously for the publish. The user will
443 2 : // (asynchronously) wait on the batch's fsyncWait.
444 2 : b.commit.Add(1)
445 2 : b.fsyncWait.Add(1)
446 2 : case !noSyncWait:
447 2 : syncErr = &b.commitErr
448 2 : syncWG = &b.commit
449 2 : // Must wait for both the publish and the WAL fsync.
450 2 : b.commit.Add(2)
451 : }
452 :
453 2 : p.mu.Lock()
454 2 :
455 2 : // Enqueue the batch in the pending queue. Note that while the pending queue
456 2 : // is lock-free, we want the order of batches to be the same as the sequence
457 2 : // number order.
458 2 : p.pending.enqueue(b)
459 2 :
460 2 : // Assign the batch a sequence number. Note that we use atomic operations
461 2 : // here to handle concurrent reads of logSeqNum. commitPipeline.mu provides
462 2 : // mutual exclusion for other goroutines writing to logSeqNum.
463 2 : b.setSeqNum(p.env.logSeqNum.Add(n) - n)
464 2 :
465 2 : // Write the data to the WAL.
466 2 : mem, err := p.env.write(b, syncWG, syncErr)
467 2 :
468 2 : p.mu.Unlock()
469 2 :
470 2 : return mem, err
471 : }
472 :
473 2 : func (p *commitPipeline) publish(b *Batch) {
474 2 : // Mark the batch as applied.
475 2 : b.applied.Store(true)
476 2 :
477 2 : // Loop dequeuing applied batches from the pending queue. If our batch was
478 2 : // the head of the pending queue we are guaranteed that either we'll publish
479 2 : // it or someone else will dequeueApplied and publish it. If our batch is not the
480 2 : // head of the queue then either we'll dequeueApplied applied batches and reach our
481 2 : // batch or there is an unapplied batch blocking us. When that unapplied
482 2 : // batch applies it will go through the same process and publish our batch
483 2 : // for us.
484 2 : for {
485 2 : t := p.pending.dequeueApplied()
486 2 : if t == nil {
487 2 : // Wait for another goroutine to publish us. We might also be waiting for
488 2 : // the WAL sync to finish.
489 2 : now := time.Now()
490 2 : b.commit.Wait()
491 2 : b.commitStats.CommitWaitDuration += time.Since(now)
492 2 : break
493 : }
494 2 : if !t.applied.Load() {
495 0 : panic("not reached")
496 : }
497 :
498 : // We're responsible for publishing the sequence number for batch t, but
499 : // another concurrent goroutine might sneak in and publish the sequence
500 : // number for a subsequent batch. That's ok as all we're guaranteeing is
501 : // that the sequence number ratchets up.
502 2 : for {
503 2 : curSeqNum := p.env.visibleSeqNum.Load()
504 2 : newSeqNum := t.SeqNum() + uint64(t.Count())
505 2 : if newSeqNum <= curSeqNum {
506 1 : // t's sequence number has already been published.
507 1 : break
508 : }
509 2 : if p.env.visibleSeqNum.CompareAndSwap(curSeqNum, newSeqNum) {
510 2 : // We successfully published t's sequence number.
511 2 : break
512 : }
513 : }
514 :
515 2 : t.commit.Done()
516 : }
517 : }
|