Line data Source code
1 : // Copyright 2018 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "runtime"
9 : "sync"
10 : "sync/atomic"
11 : "time"
12 :
13 : "github.com/cockroachdb/pebble/batchrepr"
14 : "github.com/cockroachdb/pebble/record"
15 : )
16 :
17 : // commitQueue is a lock-free fixed-size single-producer, multi-consumer
18 : // queue. The single producer can enqueue (push) to the head, and consumers can
19 : // dequeue (pop) from the tail.
20 : //
21 : // It has the added feature that it nils out unused slots to avoid unnecessary
22 : // retention of objects.
23 : type commitQueue struct {
24 : // headTail packs together a 32-bit head index and a 32-bit tail index. Both
25 : // are indexes into slots modulo len(slots)-1.
26 : //
27 : // tail = index of oldest data in queue
28 : // head = index of next slot to fill
29 : //
30 : // Slots in the range [tail, head) are owned by consumers. A consumer
31 : // continues to own a slot outside this range until it nils the slot, at
32 : // which point ownership passes to the producer.
33 : //
34 : // The head index is stored in the most-significant bits so that we can
35 : // atomically add to it and the overflow is harmless.
36 : headTail atomic.Uint64
37 :
38 : // slots is a ring buffer of values stored in this queue. The size must be a
39 : // power of 2. A slot is in use until *both* the tail index has moved beyond
40 : // it and the slot value has been set to nil. The slot value is set to nil
41 : // atomically by the consumer and read atomically by the producer.
42 : slots [record.SyncConcurrency]atomic.Pointer[Batch]
43 : }
44 :
45 : const dequeueBits = 32
46 :
47 1 : func (q *commitQueue) unpack(ptrs uint64) (head, tail uint32) {
48 1 : const mask = 1<<dequeueBits - 1
49 1 : head = uint32((ptrs >> dequeueBits) & mask)
50 1 : tail = uint32(ptrs & mask)
51 1 : return
52 1 : }
53 :
54 1 : func (q *commitQueue) pack(head, tail uint32) uint64 {
55 1 : const mask = 1<<dequeueBits - 1
56 1 : return (uint64(head) << dequeueBits) |
57 1 : uint64(tail&mask)
58 1 : }
59 :
60 1 : func (q *commitQueue) enqueue(b *Batch) {
61 1 : ptrs := q.headTail.Load()
62 1 : head, tail := q.unpack(ptrs)
63 1 : if (tail+uint32(len(q.slots)))&(1<<dequeueBits-1) == head {
64 0 : // Queue is full. This should never be reached because commitPipeline.commitQueueSem
65 0 : // limits the number of concurrent operations.
66 0 : panic("pebble: not reached")
67 : }
68 1 : slot := &q.slots[head&uint32(len(q.slots)-1)]
69 1 :
70 1 : // Check if the head slot has been released by dequeueApplied.
71 1 : for slot.Load() != nil {
72 0 : // Another goroutine is still cleaning up the tail, so the queue is
73 0 : // actually still full. We spin because this should resolve itself
74 0 : // momentarily.
75 0 : runtime.Gosched()
76 0 : }
77 :
78 : // The head slot is free, so we own it.
79 1 : slot.Store(b)
80 1 :
81 1 : // Increment head. This passes ownership of slot to dequeueApplied and acts as a
82 1 : // store barrier for writing the slot.
83 1 : q.headTail.Add(1 << dequeueBits)
84 : }
85 :
86 : // dequeueApplied removes the earliest enqueued Batch, if it is applied.
87 : //
88 : // Returns nil if the commit queue is empty or the earliest Batch is not yet
89 : // applied.
90 1 : func (q *commitQueue) dequeueApplied() *Batch {
91 1 : for {
92 1 : ptrs := q.headTail.Load()
93 1 : head, tail := q.unpack(ptrs)
94 1 : if tail == head {
95 1 : // Queue is empty.
96 1 : return nil
97 1 : }
98 :
99 1 : slot := &q.slots[tail&uint32(len(q.slots)-1)]
100 1 : b := slot.Load()
101 1 : if b == nil || !b.applied.Load() {
102 1 : // The batch is not ready to be dequeued, or another goroutine has
103 1 : // already dequeued it.
104 1 : return nil
105 1 : }
106 :
107 : // Confirm head and tail (for our speculative check above) and increment
108 : // tail. If this succeeds, then we own the slot at tail.
109 1 : ptrs2 := q.pack(head, tail+1)
110 1 : if q.headTail.CompareAndSwap(ptrs, ptrs2) {
111 1 : // We now own slot.
112 1 : //
113 1 : // Tell enqueue that we're done with this slot. Zeroing the slot is also
114 1 : // important so we don't leave behind references that could keep this object
115 1 : // live longer than necessary.
116 1 : slot.Store(nil)
117 1 : // At this point enqueue owns the slot.
118 1 : return b
119 1 : }
120 : }
121 : }
122 :
123 : // commitEnv contains the environment that a commitPipeline interacts
124 : // with. This allows fine-grained testing of commitPipeline behavior without
125 : // construction of an entire DB.
126 : type commitEnv struct {
127 : // The next sequence number to give to a batch. Protected by
128 : // commitPipeline.mu.
129 : logSeqNum *atomic.Uint64
130 : // The visible sequence number at which reads should be performed. Ratcheted
131 : // upwards atomically as batches are applied to the memtable.
132 : visibleSeqNum *atomic.Uint64
133 :
134 : // Apply the batch to the specified memtable. Called concurrently.
135 : apply func(b *Batch, mem *memTable) error
136 : // Write the batch to the WAL. If wg != nil, the data will be persisted
137 : // asynchronously and done will be called on wg upon completion. If wg != nil
138 : // and err != nil, a failure to persist the WAL will populate *err. Returns
139 : // the memtable the batch should be applied to. Serial execution enforced by
140 : // commitPipeline.mu.
141 : write func(b *Batch, wg *sync.WaitGroup, err *error) (*memTable, error)
142 : }
143 :
144 : // A commitPipeline manages the stages of committing a set of mutations
145 : // (contained in a single Batch) atomically to the DB. The steps are
146 : // conceptually:
147 : //
148 : // 1. Write the batch to the WAL and optionally sync the WAL
149 : // 2. Apply the mutations in the batch to the memtable
150 : //
151 : // These two simple steps are made complicated by the desire for high
152 : // performance. In the absence of concurrency, performance is limited by how
153 : // fast a batch can be written (and synced) to the WAL and then added to the
154 : // memtable, both of which are outside the purview of the commit
155 : // pipeline. Performance under concurrency is the primary concern of the commit
156 : // pipeline, though it also needs to maintain two invariants:
157 : //
158 : // 1. Batches need to be written to the WAL in sequence number order.
159 : // 2. Batches need to be made visible for reads in sequence number order. This
160 : // invariant arises from the use of a single sequence number which
161 : // indicates which mutations are visible.
162 : //
163 : // Taking these invariants into account, let's revisit the work the commit
164 : // pipeline needs to perform. Writing the batch to the WAL is necessarily
165 : // serialized as there is a single WAL object. The order of the entries in the
166 : // WAL defines the sequence number order. Note that writing to the WAL is
167 : // extremely fast, usually just a memory copy. Applying the mutations in a
168 : // batch to the memtable can occur concurrently as the underlying skiplist
169 : // supports concurrent insertions. Publishing the visible sequence number is
170 : // another serialization point, but one with a twist: the visible sequence
171 : // number cannot be bumped until the mutations for earlier batches have
172 : // finished applying to the memtable (the visible sequence number only ratchets
173 : // up). Lastly, if requested, the commit waits for the WAL to sync. Note that
174 : // waiting for the WAL sync after ratcheting the visible sequence number allows
175 : // another goroutine to read committed data before the WAL has synced. This is
176 : // similar behavior to RocksDB's manual WAL flush functionality. Application
177 : // code needs to protect against this if necessary.
178 : //
179 : // The full outline of the commit pipeline operation is as follows:
180 : //
181 : // with commitPipeline mutex locked:
182 : // assign batch sequence number
183 : // write batch to WAL
184 : // (optionally) add batch to WAL sync list
185 : // apply batch to memtable (concurrently)
186 : // wait for earlier batches to apply
187 : // ratchet read sequence number
188 : // (optionally) wait for the WAL to sync
189 : //
190 : // As soon as a batch has been written to the WAL, the commitPipeline mutex is
191 : // released allowing another batch to write to the WAL. Each commit operation
192 : // individually applies its batch to the memtable providing concurrency. The
193 : // WAL sync happens concurrently with applying to the memtable (see
194 : // commitPipeline.syncLoop).
195 : //
196 : // The "waits for earlier batches to apply" work is more complicated than might
197 : // be expected. The obvious approach would be to keep a queue of pending
198 : // batches and for each batch to wait for the previous batch to finish
199 : // committing. This approach was tried initially and turned out to be too
200 : // slow. The problem is that it causes excessive goroutine activity as each
201 : // committing goroutine needs to wake up in order for the next goroutine to be
202 : // unblocked. The approach taken in the current code is conceptually similar,
203 : // though it avoids waking a goroutine to perform work that another goroutine
204 : // can perform. A commitQueue (a single-producer, multiple-consumer queue)
205 : // holds the ordered list of committing batches. Addition to the queue is done
206 : // while holding commitPipeline.mutex ensuring the same ordering of batches in
207 : // the queue as the ordering in the WAL. When a batch finishes applying to the
208 : // memtable, it atomically updates its Batch.applied field. Ratcheting of the
209 : // visible sequence number is done by commitPipeline.publish which loops
210 : // dequeueing "applied" batches and ratcheting the visible sequence number. If
211 : // we hit an unapplied batch at the head of the queue we can block as we know
212 : // that committing of that unapplied batch will eventually find our (applied)
213 : // batch in the queue. See commitPipeline.publish for additional commentary.
214 : type commitPipeline struct {
215 : // WARNING: The following struct `commitQueue` contains fields which will
216 : // be accessed atomically.
217 : //
218 : // Go allocations are guaranteed to be 64-bit aligned which we take advantage
219 : // of by placing the 64-bit fields which we access atomically at the beginning
220 : // of the commitPipeline struct.
221 : // For more information, see https://golang.org/pkg/sync/atomic/#pkg-note-BUG.
222 : // Queue of pending batches to commit.
223 : pending commitQueue
224 : env commitEnv
225 : // The commit path has two queues:
226 : // - commitPipeline.pending contains batches whose seqnums have not yet been
227 : // published. It is a lock-free single producer multi consumer queue.
228 : // - LogWriter.flusher.syncQ contains state for batches that have asked for
229 : // a sync. It is a lock-free single producer single consumer queue.
230 : // These lock-free queues have a fixed capacity. And since they are
231 : // lock-free, we cannot do blocking waits when pushing onto these queues, in
232 : // case they are full. Additionally, adding to these queues happens while
233 : // holding commitPipeline.mu, and we don't want to block while holding that
234 : // mutex since it is also needed by other code.
235 : //
236 : // Popping from these queues is independent and for a particular batch can
237 : // occur in either order, though it is more common that popping from the
238 : // commitPipeline.pending will happen first.
239 : //
240 : // Due to these constraints, we reserve a unit of space in each queue before
241 : // acquiring commitPipeline.mu, which also ensures that the push operation
242 : // is guaranteed to have space in the queue. The commitQueueSem and
243 : // logSyncQSem are used for this reservation.
244 : commitQueueSem chan struct{}
245 : logSyncQSem chan struct{}
246 : ingestSem chan struct{}
247 : // The mutex to use for synchronizing access to logSeqNum and serializing
248 : // calls to commitEnv.write().
249 : mu sync.Mutex
250 : }
251 :
252 1 : func newCommitPipeline(env commitEnv) *commitPipeline {
253 1 : p := &commitPipeline{
254 1 : env: env,
255 1 : // The capacity of both commitQueue.slots and syncQueue.slots is set to
256 1 : // record.SyncConcurrency, which also determines the value of these
257 1 : // semaphores. We used to have a single semaphore, which required that the
258 1 : // capacity of these queues be the same. Now that we have two semaphores,
259 1 : // the capacity of these queues could be changed to be different. Say half
260 1 : // of the batches asked to be synced, but syncing took 5x the latency of
261 1 : // adding to the memtable and publishing. Then syncQueue.slots could be
262 1 : // sized as 0.5*5 of the commitQueue.slots. We can explore this if we find
263 1 : // that LogWriterMetrics.SyncQueueLen has high utilization under some
264 1 : // workloads.
265 1 : //
266 1 : // NB: the commit concurrency is one less than SyncConcurrency because we
267 1 : // have to allow one "slot" for a concurrent WAL rotation which will close
268 1 : // and sync the WAL.
269 1 : commitQueueSem: make(chan struct{}, record.SyncConcurrency-1),
270 1 : logSyncQSem: make(chan struct{}, record.SyncConcurrency-1),
271 1 : ingestSem: make(chan struct{}, 1),
272 1 : }
273 1 : return p
274 1 : }
275 :
276 : // directWrite is used to directly write to the WAL. commitPipeline.mu must be
277 : // held while this is called. DB.mu must not be held. directWrite will only
278 : // return once the WAL sync is complete. Note that DirectWrite is a special case
279 : // function which is currently only used when ingesting sstables as a flushable.
280 : // Reason carefully about the correctness argument when calling this function
281 : // from any context.
282 1 : func (p *commitPipeline) directWrite(b *Batch) error {
283 1 : var syncWG sync.WaitGroup
284 1 : var syncErr error
285 1 : syncWG.Add(1)
286 1 : p.logSyncQSem <- struct{}{}
287 1 : _, err := p.env.write(b, &syncWG, &syncErr)
288 1 : syncWG.Wait()
289 1 : err = firstError(err, syncErr)
290 1 : return err
291 1 : }
292 :
293 : // Commit the specified batch, writing it to the WAL, optionally syncing the
294 : // WAL, and applying the batch to the memtable. Upon successful return the
295 : // batch's mutations will be visible for reading.
296 : // REQUIRES: noSyncWait => syncWAL
297 1 : func (p *commitPipeline) Commit(b *Batch, syncWAL bool, noSyncWait bool) error {
298 1 : if b.Empty() {
299 1 : return nil
300 1 : }
301 :
302 1 : commitStartTime := time.Now()
303 1 : // Acquire semaphores.
304 1 : p.commitQueueSem <- struct{}{}
305 1 : if syncWAL {
306 1 : p.logSyncQSem <- struct{}{}
307 1 : }
308 1 : b.commitStats.SemaphoreWaitDuration = time.Since(commitStartTime)
309 1 :
310 1 : // Prepare the batch for committing: enqueuing the batch in the pending
311 1 : // queue, determining the batch sequence number and writing the data to the
312 1 : // WAL.
313 1 : //
314 1 : // NB: We set Batch.commitErr on error so that the batch won't be a candidate
315 1 : // for reuse. See Batch.release().
316 1 : mem, err := p.prepare(b, syncWAL, noSyncWait)
317 1 : if err != nil {
318 0 : b.db = nil // prevent batch reuse on error
319 0 : // NB: we are not doing <-p.commitQueueSem since the batch is still
320 0 : // sitting in the pending queue. We should consider fixing this by also
321 0 : // removing the batch from the pending queue.
322 0 : return err
323 0 : }
324 :
325 : // Apply the batch to the memtable.
326 1 : if err := p.env.apply(b, mem); err != nil {
327 0 : b.db = nil // prevent batch reuse on error
328 0 : // NB: we are not doing <-p.commitQueueSem since the batch is still
329 0 : // sitting in the pending queue. We should consider fixing this by also
330 0 : // removing the batch from the pending queue.
331 0 : return err
332 0 : }
333 :
334 : // Publish the batch sequence number.
335 1 : p.publish(b)
336 1 :
337 1 : <-p.commitQueueSem
338 1 :
339 1 : if !noSyncWait {
340 1 : // Already waited for commit, so look at the error.
341 1 : if b.commitErr != nil {
342 0 : b.db = nil // prevent batch reuse on error
343 0 : err = b.commitErr
344 0 : }
345 : }
346 : // Else noSyncWait. The LogWriter can be concurrently writing to
347 : // b.commitErr. We will read b.commitErr in Batch.SyncWait after the
348 : // LogWriter is done writing.
349 :
350 1 : b.commitStats.TotalDuration = time.Since(commitStartTime)
351 1 :
352 1 : return err
353 : }
354 :
355 : // AllocateSeqNum allocates count sequence numbers, invokes the prepare
356 : // callback, then the apply callback, and then publishes the sequence
357 : // numbers. AllocateSeqNum does not write to the WAL or add entries to the
358 : // memtable. AllocateSeqNum can be used to sequence an operation such as
359 : // sstable ingestion within the commit pipeline. The prepare callback is
360 : // invoked with commitPipeline.mu held, but note that DB.mu is not held and
361 : // must be locked if necessary.
362 : func (p *commitPipeline) AllocateSeqNum(
363 : count int, prepare func(seqNum uint64), apply func(seqNum uint64),
364 1 : ) {
365 1 : // This method is similar to Commit and prepare. Be careful about trying to
366 1 : // share additional code with those methods because Commit and prepare are
367 1 : // performance critical code paths.
368 1 :
369 1 : b := newBatch(nil)
370 1 : defer b.Close()
371 1 :
372 1 : // Give the batch a count of 1 so that the log and visible sequence number
373 1 : // are incremented correctly.
374 1 : b.data = make([]byte, batchrepr.HeaderLen)
375 1 : b.setCount(uint32(count))
376 1 : b.commit.Add(1)
377 1 :
378 1 : p.commitQueueSem <- struct{}{}
379 1 :
380 1 : p.mu.Lock()
381 1 :
382 1 : // Enqueue the batch in the pending queue. Note that while the pending queue
383 1 : // is lock-free, we want the order of batches to be the same as the sequence
384 1 : // number order.
385 1 : p.pending.enqueue(b)
386 1 :
387 1 : // Assign the batch a sequence number. Note that we use atomic operations
388 1 : // here to handle concurrent reads of logSeqNum. commitPipeline.mu provides
389 1 : // mutual exclusion for other goroutines writing to logSeqNum.
390 1 : logSeqNum := p.env.logSeqNum.Add(uint64(count)) - uint64(count)
391 1 : seqNum := logSeqNum
392 1 : if seqNum == 0 {
393 1 : // We can't use the value 0 for the global seqnum during ingestion, because
394 1 : // 0 indicates no global seqnum. So allocate one more seqnum.
395 1 : p.env.logSeqNum.Add(1)
396 1 : seqNum++
397 1 : }
398 1 : b.setSeqNum(seqNum)
399 1 :
400 1 : // Wait for any outstanding writes to the memtable to complete. This is
401 1 : // necessary for ingestion so that the check for memtable overlap can see any
402 1 : // writes that were sequenced before the ingestion. The spin loop is
403 1 : // unfortunate, but obviates the need for additional synchronization.
404 1 : for {
405 1 : visibleSeqNum := p.env.visibleSeqNum.Load()
406 1 : if visibleSeqNum == logSeqNum {
407 1 : break
408 : }
409 1 : runtime.Gosched()
410 : }
411 :
412 : // Invoke the prepare callback. Note the lack of error reporting. Even if the
413 : // callback internally fails, the sequence number needs to be published in
414 : // order to allow the commit pipeline to proceed.
415 1 : prepare(b.SeqNum())
416 1 :
417 1 : p.mu.Unlock()
418 1 :
419 1 : // Invoke the apply callback.
420 1 : apply(b.SeqNum())
421 1 :
422 1 : // Publish the sequence number.
423 1 : p.publish(b)
424 1 :
425 1 : <-p.commitQueueSem
426 : }
427 :
428 1 : func (p *commitPipeline) prepare(b *Batch, syncWAL bool, noSyncWait bool) (*memTable, error) {
429 1 : n := uint64(b.Count())
430 1 : if n == invalidBatchCount {
431 0 : return nil, ErrInvalidBatch
432 0 : }
433 1 : var syncWG *sync.WaitGroup
434 1 : var syncErr *error
435 1 : switch {
436 1 : case !syncWAL:
437 1 : // Only need to wait for the publish.
438 1 : b.commit.Add(1)
439 : // Remaining cases represent syncWAL=true.
440 1 : case noSyncWait:
441 1 : syncErr = &b.commitErr
442 1 : syncWG = &b.fsyncWait
443 1 : // Only need to wait synchronously for the publish. The user will
444 1 : // (asynchronously) wait on the batch's fsyncWait.
445 1 : b.commit.Add(1)
446 1 : b.fsyncWait.Add(1)
447 1 : case !noSyncWait:
448 1 : syncErr = &b.commitErr
449 1 : syncWG = &b.commit
450 1 : // Must wait for both the publish and the WAL fsync.
451 1 : b.commit.Add(2)
452 : }
453 :
454 1 : p.mu.Lock()
455 1 :
456 1 : // Enqueue the batch in the pending queue. Note that while the pending queue
457 1 : // is lock-free, we want the order of batches to be the same as the sequence
458 1 : // number order.
459 1 : p.pending.enqueue(b)
460 1 :
461 1 : // Assign the batch a sequence number. Note that we use atomic operations
462 1 : // here to handle concurrent reads of logSeqNum. commitPipeline.mu provides
463 1 : // mutual exclusion for other goroutines writing to logSeqNum.
464 1 : b.setSeqNum(p.env.logSeqNum.Add(n) - n)
465 1 :
466 1 : // Write the data to the WAL.
467 1 : mem, err := p.env.write(b, syncWG, syncErr)
468 1 :
469 1 : p.mu.Unlock()
470 1 :
471 1 : return mem, err
472 : }
473 :
474 1 : func (p *commitPipeline) publish(b *Batch) {
475 1 : // Mark the batch as applied.
476 1 : b.applied.Store(true)
477 1 :
478 1 : // Loop dequeuing applied batches from the pending queue. If our batch was
479 1 : // the head of the pending queue we are guaranteed that either we'll publish
480 1 : // it or someone else will dequeueApplied and publish it. If our batch is not the
481 1 : // head of the queue then either we'll dequeueApplied applied batches and reach our
482 1 : // batch or there is an unapplied batch blocking us. When that unapplied
483 1 : // batch applies it will go through the same process and publish our batch
484 1 : // for us.
485 1 : for {
486 1 : t := p.pending.dequeueApplied()
487 1 : if t == nil {
488 1 : // Wait for another goroutine to publish us. We might also be waiting for
489 1 : // the WAL sync to finish.
490 1 : now := time.Now()
491 1 : b.commit.Wait()
492 1 : b.commitStats.CommitWaitDuration += time.Since(now)
493 1 : break
494 : }
495 1 : if !t.applied.Load() {
496 0 : panic("not reached")
497 : }
498 :
499 : // We're responsible for publishing the sequence number for batch t, but
500 : // another concurrent goroutine might sneak in and publish the sequence
501 : // number for a subsequent batch. That's ok as all we're guaranteeing is
502 : // that the sequence number ratchets up.
503 1 : for {
504 1 : curSeqNum := p.env.visibleSeqNum.Load()
505 1 : newSeqNum := t.SeqNum() + uint64(t.Count())
506 1 : if newSeqNum <= curSeqNum {
507 1 : // t's sequence number has already been published.
508 1 : break
509 : }
510 1 : if p.env.visibleSeqNum.CompareAndSwap(curSeqNum, newSeqNum) {
511 1 : // We successfully published t's sequence number.
512 1 : break
513 : }
514 : }
515 :
516 1 : t.commit.Done()
517 : }
518 : }
|