LCOV - code coverage report
Current view: top level - pebble - commit.go (source / functions) Hit Total Coverage
Test: 2024-06-27 08:15Z 18b77232 - tests only.lcov Lines: 240 266 90.2 %
Date: 2024-06-27 08:16:22 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2018 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package pebble
       6             : 
       7             : import (
       8             :         "runtime"
       9             :         "sync"
      10             :         "sync/atomic"
      11             :         "time"
      12             : 
      13             :         "github.com/cockroachdb/pebble/batchrepr"
      14             :         "github.com/cockroachdb/pebble/internal/base"
      15             :         "github.com/cockroachdb/pebble/record"
      16             : )
      17             : 
      18             : // commitQueue is a lock-free fixed-size single-producer, multi-consumer
      19             : // queue. The single producer can enqueue (push) to the head, and consumers can
      20             : // dequeue (pop) from the tail.
      21             : //
      22             : // It has the added feature that it nils out unused slots to avoid unnecessary
      23             : // retention of objects.
      24             : type commitQueue struct {
      25             :         // headTail packs together a 32-bit head index and a 32-bit tail index. Both
      26             :         // are indexes into slots modulo len(slots)-1.
      27             :         //
      28             :         // tail = index of oldest data in queue
      29             :         // head = index of next slot to fill
      30             :         //
      31             :         // Slots in the range [tail, head) are owned by consumers.  A consumer
      32             :         // continues to own a slot outside this range until it nils the slot, at
      33             :         // which point ownership passes to the producer.
      34             :         //
      35             :         // The head index is stored in the most-significant bits so that we can
      36             :         // atomically add to it and the overflow is harmless.
      37             :         headTail atomic.Uint64
      38             : 
      39             :         // slots is a ring buffer of values stored in this queue. The size must be a
      40             :         // power of 2. A slot is in use until *both* the tail index has moved beyond
      41             :         // it and the slot value has been set to nil. The slot value is set to nil
      42             :         // atomically by the consumer and read atomically by the producer.
      43             :         slots [record.SyncConcurrency]atomic.Pointer[Batch]
      44             : }
      45             : 
      46             : const dequeueBits = 32
      47             : 
      48           1 : func (q *commitQueue) unpack(ptrs uint64) (head, tail uint32) {
      49           1 :         const mask = 1<<dequeueBits - 1
      50           1 :         head = uint32((ptrs >> dequeueBits) & mask)
      51           1 :         tail = uint32(ptrs & mask)
      52           1 :         return
      53           1 : }
      54             : 
      55           1 : func (q *commitQueue) pack(head, tail uint32) uint64 {
      56           1 :         const mask = 1<<dequeueBits - 1
      57           1 :         return (uint64(head) << dequeueBits) |
      58           1 :                 uint64(tail&mask)
      59           1 : }
      60             : 
      61           1 : func (q *commitQueue) enqueue(b *Batch) {
      62           1 :         ptrs := q.headTail.Load()
      63           1 :         head, tail := q.unpack(ptrs)
      64           1 :         if (tail+uint32(len(q.slots)))&(1<<dequeueBits-1) == head {
      65           0 :                 // Queue is full. This should never be reached because commitPipeline.commitQueueSem
      66           0 :                 // limits the number of concurrent operations.
      67           0 :                 panic("pebble: not reached")
      68             :         }
      69           1 :         slot := &q.slots[head&uint32(len(q.slots)-1)]
      70           1 : 
      71           1 :         // Check if the head slot has been released by dequeueApplied.
      72           1 :         for slot.Load() != nil {
      73           0 :                 // Another goroutine is still cleaning up the tail, so the queue is
      74           0 :                 // actually still full. We spin because this should resolve itself
      75           0 :                 // momentarily.
      76           0 :                 runtime.Gosched()
      77           0 :         }
      78             : 
      79             :         // The head slot is free, so we own it.
      80           1 :         slot.Store(b)
      81           1 : 
      82           1 :         // Increment head. This passes ownership of slot to dequeueApplied and acts as a
      83           1 :         // store barrier for writing the slot.
      84           1 :         q.headTail.Add(1 << dequeueBits)
      85             : }
      86             : 
      87             : // dequeueApplied removes the earliest enqueued Batch, if it is applied.
      88             : //
      89             : // Returns nil if the commit queue is empty or the earliest Batch is not yet
      90             : // applied.
      91           1 : func (q *commitQueue) dequeueApplied() *Batch {
      92           1 :         for {
      93           1 :                 ptrs := q.headTail.Load()
      94           1 :                 head, tail := q.unpack(ptrs)
      95           1 :                 if tail == head {
      96           1 :                         // Queue is empty.
      97           1 :                         return nil
      98           1 :                 }
      99             : 
     100           1 :                 slot := &q.slots[tail&uint32(len(q.slots)-1)]
     101           1 :                 b := slot.Load()
     102           1 :                 if b == nil || !b.applied.Load() {
     103           1 :                         // The batch is not ready to be dequeued, or another goroutine has
     104           1 :                         // already dequeued it.
     105           1 :                         return nil
     106           1 :                 }
     107             : 
     108             :                 // Confirm head and tail (for our speculative check above) and increment
     109             :                 // tail. If this succeeds, then we own the slot at tail.
     110           1 :                 ptrs2 := q.pack(head, tail+1)
     111           1 :                 if q.headTail.CompareAndSwap(ptrs, ptrs2) {
     112           1 :                         // We now own slot.
     113           1 :                         //
     114           1 :                         // Tell enqueue that we're done with this slot. Zeroing the slot is also
     115           1 :                         // important so we don't leave behind references that could keep this object
     116           1 :                         // live longer than necessary.
     117           1 :                         slot.Store(nil)
     118           1 :                         // At this point enqueue owns the slot.
     119           1 :                         return b
     120           1 :                 }
     121             :         }
     122             : }
     123             : 
     124             : // commitEnv contains the environment that a commitPipeline interacts
     125             : // with. This allows fine-grained testing of commitPipeline behavior without
     126             : // construction of an entire DB.
     127             : type commitEnv struct {
     128             :         // The next sequence number to give to a batch. Protected by
     129             :         // commitPipeline.mu.
     130             :         logSeqNum *base.AtomicSeqNum
     131             :         // The visible sequence number at which reads should be performed. Ratcheted
     132             :         // upwards atomically as batches are applied to the memtable.
     133             :         visibleSeqNum *base.AtomicSeqNum
     134             : 
     135             :         // Apply the batch to the specified memtable. Called concurrently.
     136             :         apply func(b *Batch, mem *memTable) error
     137             :         // Write the batch to the WAL. If wg != nil, the data will be persisted
     138             :         // asynchronously and done will be called on wg upon completion. If wg != nil
     139             :         // and err != nil, a failure to persist the WAL will populate *err. Returns
     140             :         // the memtable the batch should be applied to. Serial execution enforced by
     141             :         // commitPipeline.mu.
     142             :         write func(b *Batch, wg *sync.WaitGroup, err *error) (*memTable, error)
     143             : }
     144             : 
     145             : // A commitPipeline manages the stages of committing a set of mutations
     146             : // (contained in a single Batch) atomically to the DB. The steps are
     147             : // conceptually:
     148             : //
     149             : //  1. Write the batch to the WAL and optionally sync the WAL
     150             : //  2. Apply the mutations in the batch to the memtable
     151             : //
     152             : // These two simple steps are made complicated by the desire for high
     153             : // performance. In the absence of concurrency, performance is limited by how
     154             : // fast a batch can be written (and synced) to the WAL and then added to the
     155             : // memtable, both of which are outside the purview of the commit
     156             : // pipeline. Performance under concurrency is the primary concern of the commit
     157             : // pipeline, though it also needs to maintain two invariants:
     158             : //
     159             : //  1. Batches need to be written to the WAL in sequence number order.
     160             : //  2. Batches need to be made visible for reads in sequence number order. This
     161             : //     invariant arises from the use of a single sequence number which
     162             : //     indicates which mutations are visible.
     163             : //
     164             : // Taking these invariants into account, let's revisit the work the commit
     165             : // pipeline needs to perform. Writing the batch to the WAL is necessarily
     166             : // serialized as there is a single WAL object. The order of the entries in the
     167             : // WAL defines the sequence number order. Note that writing to the WAL is
     168             : // extremely fast, usually just a memory copy. Applying the mutations in a
     169             : // batch to the memtable can occur concurrently as the underlying skiplist
     170             : // supports concurrent insertions. Publishing the visible sequence number is
     171             : // another serialization point, but one with a twist: the visible sequence
     172             : // number cannot be bumped until the mutations for earlier batches have
     173             : // finished applying to the memtable (the visible sequence number only ratchets
     174             : // up). Lastly, if requested, the commit waits for the WAL to sync. Note that
     175             : // waiting for the WAL sync after ratcheting the visible sequence number allows
     176             : // another goroutine to read committed data before the WAL has synced. This is
     177             : // similar behavior to RocksDB's manual WAL flush functionality. Application
     178             : // code needs to protect against this if necessary.
     179             : //
     180             : // The full outline of the commit pipeline operation is as follows:
     181             : //
     182             : //      with commitPipeline mutex locked:
     183             : //        assign batch sequence number
     184             : //        write batch to WAL
     185             : //      (optionally) add batch to WAL sync list
     186             : //      apply batch to memtable (concurrently)
     187             : //      wait for earlier batches to apply
     188             : //      ratchet read sequence number
     189             : //      (optionally) wait for the WAL to sync
     190             : //
     191             : // As soon as a batch has been written to the WAL, the commitPipeline mutex is
     192             : // released allowing another batch to write to the WAL. Each commit operation
     193             : // individually applies its batch to the memtable providing concurrency. The
     194             : // WAL sync happens concurrently with applying to the memtable (see
     195             : // commitPipeline.syncLoop).
     196             : //
     197             : // The "waits for earlier batches to apply" work is more complicated than might
     198             : // be expected. The obvious approach would be to keep a queue of pending
     199             : // batches and for each batch to wait for the previous batch to finish
     200             : // committing. This approach was tried initially and turned out to be too
     201             : // slow. The problem is that it causes excessive goroutine activity as each
     202             : // committing goroutine needs to wake up in order for the next goroutine to be
     203             : // unblocked. The approach taken in the current code is conceptually similar,
     204             : // though it avoids waking a goroutine to perform work that another goroutine
     205             : // can perform. A commitQueue (a single-producer, multiple-consumer queue)
     206             : // holds the ordered list of committing batches. Addition to the queue is done
     207             : // while holding commitPipeline.mutex ensuring the same ordering of batches in
     208             : // the queue as the ordering in the WAL. When a batch finishes applying to the
     209             : // memtable, it atomically updates its Batch.applied field. Ratcheting of the
     210             : // visible sequence number is done by commitPipeline.publish which loops
     211             : // dequeueing "applied" batches and ratcheting the visible sequence number. If
     212             : // we hit an unapplied batch at the head of the queue we can block as we know
     213             : // that committing of that unapplied batch will eventually find our (applied)
     214             : // batch in the queue. See commitPipeline.publish for additional commentary.
     215             : type commitPipeline struct {
     216             :         // WARNING: The following struct `commitQueue` contains fields which will
     217             :         // be accessed atomically.
     218             :         //
     219             :         // Go allocations are guaranteed to be 64-bit aligned which we take advantage
     220             :         // of by placing the 64-bit fields which we access atomically at the beginning
     221             :         // of the commitPipeline struct.
     222             :         // For more information, see https://golang.org/pkg/sync/atomic/#pkg-note-BUG.
     223             :         // Queue of pending batches to commit.
     224             :         pending commitQueue
     225             :         env     commitEnv
     226             :         // The commit path has two queues:
     227             :         // - commitPipeline.pending contains batches whose seqnums have not yet been
     228             :         //   published. It is a lock-free single producer multi consumer queue.
     229             :         // - LogWriter.flusher.syncQ contains state for batches that have asked for
     230             :         //   a sync. It is a lock-free single producer single consumer queue.
     231             :         // These lock-free queues have a fixed capacity. And since they are
     232             :         // lock-free, we cannot do blocking waits when pushing onto these queues, in
     233             :         // case they are full. Additionally, adding to these queues happens while
     234             :         // holding commitPipeline.mu, and we don't want to block while holding that
     235             :         // mutex since it is also needed by other code.
     236             :         //
     237             :         // Popping from these queues is independent and for a particular batch can
     238             :         // occur in either order, though it is more common that popping from the
     239             :         // commitPipeline.pending will happen first.
     240             :         //
     241             :         // Due to these constraints, we reserve a unit of space in each queue before
     242             :         // acquiring commitPipeline.mu, which also ensures that the push operation
     243             :         // is guaranteed to have space in the queue. The commitQueueSem and
     244             :         // logSyncQSem are used for this reservation.
     245             :         commitQueueSem chan struct{}
     246             :         logSyncQSem    chan struct{}
     247             :         ingestSem      chan struct{}
     248             :         // The mutex to use for synchronizing access to logSeqNum and serializing
     249             :         // calls to commitEnv.write().
     250             :         mu sync.Mutex
     251             : }
     252             : 
     253           1 : func newCommitPipeline(env commitEnv) *commitPipeline {
     254           1 :         p := &commitPipeline{
     255           1 :                 env: env,
     256           1 :                 // The capacity of both commitQueue.slots and syncQueue.slots is set to
     257           1 :                 // record.SyncConcurrency, which also determines the value of these
     258           1 :                 // semaphores. We used to have a single semaphore, which required that the
     259           1 :                 // capacity of these queues be the same. Now that we have two semaphores,
     260           1 :                 // the capacity of these queues could be changed to be different. Say half
     261           1 :                 // of the batches asked to be synced, but syncing took 5x the latency of
     262           1 :                 // adding to the memtable and publishing. Then syncQueue.slots could be
     263           1 :                 // sized as 0.5*5 of the commitQueue.slots. We can explore this if we find
     264           1 :                 // that LogWriterMetrics.SyncQueueLen has high utilization under some
     265           1 :                 // workloads.
     266           1 :                 //
     267           1 :                 // NB: the commit concurrency is one less than SyncConcurrency because we
     268           1 :                 // have to allow one "slot" for a concurrent WAL rotation which will close
     269           1 :                 // and sync the WAL.
     270           1 :                 commitQueueSem: make(chan struct{}, record.SyncConcurrency-1),
     271           1 :                 logSyncQSem:    make(chan struct{}, record.SyncConcurrency-1),
     272           1 :                 ingestSem:      make(chan struct{}, 1),
     273           1 :         }
     274           1 :         return p
     275           1 : }
     276             : 
     277             : // directWrite is used to directly write to the WAL. commitPipeline.mu must be
     278             : // held while this is called. DB.mu must not be held. directWrite will only
     279             : // return once the WAL sync is complete. Note that DirectWrite is a special case
     280             : // function which is currently only used when ingesting sstables as a flushable.
     281             : // Reason carefully about the correctness argument when calling this function
     282             : // from any context.
     283           1 : func (p *commitPipeline) directWrite(b *Batch) error {
     284           1 :         var syncWG sync.WaitGroup
     285           1 :         var syncErr error
     286           1 :         syncWG.Add(1)
     287           1 :         p.logSyncQSem <- struct{}{}
     288           1 :         _, err := p.env.write(b, &syncWG, &syncErr)
     289           1 :         syncWG.Wait()
     290           1 :         err = firstError(err, syncErr)
     291           1 :         return err
     292           1 : }
     293             : 
     294             : // Commit the specified batch, writing it to the WAL, optionally syncing the
     295             : // WAL, and applying the batch to the memtable. Upon successful return the
     296             : // batch's mutations will be visible for reading.
     297             : // REQUIRES: noSyncWait => syncWAL
     298           1 : func (p *commitPipeline) Commit(b *Batch, syncWAL bool, noSyncWait bool) error {
     299           1 :         if b.Empty() {
     300           1 :                 return nil
     301           1 :         }
     302             : 
     303           1 :         commitStartTime := time.Now()
     304           1 :         // Acquire semaphores.
     305           1 :         p.commitQueueSem <- struct{}{}
     306           1 :         if syncWAL {
     307           1 :                 p.logSyncQSem <- struct{}{}
     308           1 :         }
     309           1 :         b.commitStats.SemaphoreWaitDuration = time.Since(commitStartTime)
     310           1 : 
     311           1 :         // Prepare the batch for committing: enqueuing the batch in the pending
     312           1 :         // queue, determining the batch sequence number and writing the data to the
     313           1 :         // WAL.
     314           1 :         //
     315           1 :         // NB: We set Batch.commitErr on error so that the batch won't be a candidate
     316           1 :         // for reuse. See Batch.release().
     317           1 :         mem, err := p.prepare(b, syncWAL, noSyncWait)
     318           1 :         if err != nil {
     319           0 :                 b.db = nil // prevent batch reuse on error
     320           0 :                 // NB: we are not doing <-p.commitQueueSem since the batch is still
     321           0 :                 // sitting in the pending queue. We should consider fixing this by also
     322           0 :                 // removing the batch from the pending queue.
     323           0 :                 return err
     324           0 :         }
     325             : 
     326             :         // Apply the batch to the memtable.
     327           1 :         if err := p.env.apply(b, mem); err != nil {
     328           0 :                 b.db = nil // prevent batch reuse on error
     329           0 :                 // NB: we are not doing <-p.commitQueueSem since the batch is still
     330           0 :                 // sitting in the pending queue. We should consider fixing this by also
     331           0 :                 // removing the batch from the pending queue.
     332           0 :                 return err
     333           0 :         }
     334             : 
     335             :         // Publish the batch sequence number.
     336           1 :         p.publish(b)
     337           1 : 
     338           1 :         <-p.commitQueueSem
     339           1 : 
     340           1 :         if !noSyncWait {
     341           1 :                 // Already waited for commit, so look at the error.
     342           1 :                 if b.commitErr != nil {
     343           0 :                         b.db = nil // prevent batch reuse on error
     344           0 :                         err = b.commitErr
     345           0 :                 }
     346             :         }
     347             :         // Else noSyncWait. The LogWriter can be concurrently writing to
     348             :         // b.commitErr. We will read b.commitErr in Batch.SyncWait after the
     349             :         // LogWriter is done writing.
     350             : 
     351           1 :         b.commitStats.TotalDuration = time.Since(commitStartTime)
     352           1 : 
     353           1 :         return err
     354             : }
     355             : 
     356             : // AllocateSeqNum allocates count sequence numbers, invokes the prepare
     357             : // callback, then the apply callback, and then publishes the sequence
     358             : // numbers. AllocateSeqNum does not write to the WAL or add entries to the
     359             : // memtable. AllocateSeqNum can be used to sequence an operation such as
     360             : // sstable ingestion within the commit pipeline. The prepare callback is
     361             : // invoked with commitPipeline.mu held, but note that DB.mu is not held and
     362             : // must be locked if necessary.
     363             : func (p *commitPipeline) AllocateSeqNum(
     364             :         count int, prepare func(seqNum base.SeqNum), apply func(seqNum base.SeqNum),
     365           1 : ) {
     366           1 :         // This method is similar to Commit and prepare. Be careful about trying to
     367           1 :         // share additional code with those methods because Commit and prepare are
     368           1 :         // performance critical code paths.
     369           1 : 
     370           1 :         b := newBatch(nil)
     371           1 :         defer b.Close()
     372           1 : 
     373           1 :         // Give the batch a count of 1 so that the log and visible sequence number
     374           1 :         // are incremented correctly.
     375           1 :         b.data = make([]byte, batchrepr.HeaderLen)
     376           1 :         b.setCount(uint32(count))
     377           1 :         b.commit.Add(1)
     378           1 : 
     379           1 :         p.commitQueueSem <- struct{}{}
     380           1 : 
     381           1 :         p.mu.Lock()
     382           1 : 
     383           1 :         // Enqueue the batch in the pending queue. Note that while the pending queue
     384           1 :         // is lock-free, we want the order of batches to be the same as the sequence
     385           1 :         // number order.
     386           1 :         p.pending.enqueue(b)
     387           1 : 
     388           1 :         // Assign the batch a sequence number. Note that we use atomic operations
     389           1 :         // here to handle concurrent reads of logSeqNum. commitPipeline.mu provides
     390           1 :         // mutual exclusion for other goroutines writing to logSeqNum.
     391           1 :         logSeqNum := p.env.logSeqNum.Add(base.SeqNum(count)) - base.SeqNum(count)
     392           1 :         seqNum := logSeqNum
     393           1 :         if seqNum == 0 {
     394           1 :                 // We can't use the value 0 for the global seqnum during ingestion, because
     395           1 :                 // 0 indicates no global seqnum. So allocate one more seqnum.
     396           1 :                 p.env.logSeqNum.Add(1)
     397           1 :                 seqNum++
     398           1 :         }
     399           1 :         b.setSeqNum(seqNum)
     400           1 : 
     401           1 :         // Wait for any outstanding writes to the memtable to complete. This is
     402           1 :         // necessary for ingestion so that the check for memtable overlap can see any
     403           1 :         // writes that were sequenced before the ingestion. The spin loop is
     404           1 :         // unfortunate, but obviates the need for additional synchronization.
     405           1 :         for {
     406           1 :                 visibleSeqNum := p.env.visibleSeqNum.Load()
     407           1 :                 if visibleSeqNum == logSeqNum {
     408           1 :                         break
     409             :                 }
     410           1 :                 runtime.Gosched()
     411             :         }
     412             : 
     413             :         // Invoke the prepare callback. Note the lack of error reporting. Even if the
     414             :         // callback internally fails, the sequence number needs to be published in
     415             :         // order to allow the commit pipeline to proceed.
     416           1 :         prepare(b.SeqNum())
     417           1 : 
     418           1 :         p.mu.Unlock()
     419           1 : 
     420           1 :         // Invoke the apply callback.
     421           1 :         apply(b.SeqNum())
     422           1 : 
     423           1 :         // Publish the sequence number.
     424           1 :         p.publish(b)
     425           1 : 
     426           1 :         <-p.commitQueueSem
     427             : }
     428             : 
     429           1 : func (p *commitPipeline) prepare(b *Batch, syncWAL bool, noSyncWait bool) (*memTable, error) {
     430           1 :         n := uint64(b.Count())
     431           1 :         if n == invalidBatchCount {
     432           0 :                 return nil, ErrInvalidBatch
     433           0 :         }
     434           1 :         var syncWG *sync.WaitGroup
     435           1 :         var syncErr *error
     436           1 :         switch {
     437           1 :         case !syncWAL:
     438           1 :                 // Only need to wait for the publish.
     439           1 :                 b.commit.Add(1)
     440             :         // Remaining cases represent syncWAL=true.
     441           1 :         case noSyncWait:
     442           1 :                 syncErr = &b.commitErr
     443           1 :                 syncWG = &b.fsyncWait
     444           1 :                 // Only need to wait synchronously for the publish. The user will
     445           1 :                 // (asynchronously) wait on the batch's fsyncWait.
     446           1 :                 b.commit.Add(1)
     447           1 :                 b.fsyncWait.Add(1)
     448           1 :         case !noSyncWait:
     449           1 :                 syncErr = &b.commitErr
     450           1 :                 syncWG = &b.commit
     451           1 :                 // Must wait for both the publish and the WAL fsync.
     452           1 :                 b.commit.Add(2)
     453             :         }
     454             : 
     455           1 :         p.mu.Lock()
     456           1 : 
     457           1 :         // Enqueue the batch in the pending queue. Note that while the pending queue
     458           1 :         // is lock-free, we want the order of batches to be the same as the sequence
     459           1 :         // number order.
     460           1 :         p.pending.enqueue(b)
     461           1 : 
     462           1 :         // Assign the batch a sequence number. Note that we use atomic operations
     463           1 :         // here to handle concurrent reads of logSeqNum. commitPipeline.mu provides
     464           1 :         // mutual exclusion for other goroutines writing to logSeqNum.
     465           1 :         b.setSeqNum(p.env.logSeqNum.Add(base.SeqNum(n)) - base.SeqNum(n))
     466           1 : 
     467           1 :         // Write the data to the WAL.
     468           1 :         mem, err := p.env.write(b, syncWG, syncErr)
     469           1 : 
     470           1 :         p.mu.Unlock()
     471           1 : 
     472           1 :         return mem, err
     473             : }
     474             : 
     475           1 : func (p *commitPipeline) publish(b *Batch) {
     476           1 :         // Mark the batch as applied.
     477           1 :         b.applied.Store(true)
     478           1 : 
     479           1 :         // Loop dequeuing applied batches from the pending queue. If our batch was
     480           1 :         // the head of the pending queue we are guaranteed that either we'll publish
     481           1 :         // it or someone else will dequeueApplied and publish it. If our batch is not the
     482           1 :         // head of the queue then either we'll dequeueApplied applied batches and reach our
     483           1 :         // batch or there is an unapplied batch blocking us. When that unapplied
     484           1 :         // batch applies it will go through the same process and publish our batch
     485           1 :         // for us.
     486           1 :         for {
     487           1 :                 t := p.pending.dequeueApplied()
     488           1 :                 if t == nil {
     489           1 :                         // Wait for another goroutine to publish us. We might also be waiting for
     490           1 :                         // the WAL sync to finish.
     491           1 :                         now := time.Now()
     492           1 :                         b.commit.Wait()
     493           1 :                         b.commitStats.CommitWaitDuration += time.Since(now)
     494           1 :                         break
     495             :                 }
     496           1 :                 if !t.applied.Load() {
     497           0 :                         panic("not reached")
     498             :                 }
     499             : 
     500             :                 // We're responsible for publishing the sequence number for batch t, but
     501             :                 // another concurrent goroutine might sneak in and publish the sequence
     502             :                 // number for a subsequent batch. That's ok as all we're guaranteeing is
     503             :                 // that the sequence number ratchets up.
     504           1 :                 for {
     505           1 :                         curSeqNum := p.env.visibleSeqNum.Load()
     506           1 :                         newSeqNum := t.SeqNum() + base.SeqNum(t.Count())
     507           1 :                         if newSeqNum <= curSeqNum {
     508           1 :                                 // t's sequence number has already been published.
     509           1 :                                 break
     510             :                         }
     511           1 :                         if p.env.visibleSeqNum.CompareAndSwap(curSeqNum, newSeqNum) {
     512           1 :                                 // We successfully published t's sequence number.
     513           1 :                                 break
     514             :                         }
     515             :                 }
     516             : 
     517           1 :                 t.commit.Done()
     518             :         }
     519             : }

Generated by: LCOV version 1.14