LCOV - code coverage report
Current view: top level - pebble - commit.go (source / functions) Hit Total Coverage
Test: 2023-11-18 08:15Z 717d49c2 - meta test only.lcov Lines: 228 266 85.7 %
Date: 2023-11-18 08:16:19 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2018 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package pebble
       6             : 
       7             : import (
       8             :         "runtime"
       9             :         "sync"
      10             :         "sync/atomic"
      11             :         "time"
      12             : 
      13             :         "github.com/cockroachdb/pebble/record"
      14             : )
      15             : 
      16             : // commitQueue is a lock-free fixed-size single-producer, multi-consumer
      17             : // queue. The single producer can enqueue (push) to the head, and consumers can
      18             : // dequeue (pop) from the tail.
      19             : //
      20             : // It has the added feature that it nils out unused slots to avoid unnecessary
      21             : // retention of objects.
      22             : type commitQueue struct {
      23             :         // headTail packs together a 32-bit head index and a 32-bit tail index. Both
      24             :         // are indexes into slots modulo len(slots)-1.
      25             :         //
      26             :         // tail = index of oldest data in queue
      27             :         // head = index of next slot to fill
      28             :         //
      29             :         // Slots in the range [tail, head) are owned by consumers.  A consumer
      30             :         // continues to own a slot outside this range until it nils the slot, at
      31             :         // which point ownership passes to the producer.
      32             :         //
      33             :         // The head index is stored in the most-significant bits so that we can
      34             :         // atomically add to it and the overflow is harmless.
      35             :         headTail atomic.Uint64
      36             : 
      37             :         // slots is a ring buffer of values stored in this queue. The size must be a
      38             :         // power of 2. A slot is in use until *both* the tail index has moved beyond
      39             :         // it and the slot value has been set to nil. The slot value is set to nil
      40             :         // atomically by the consumer and read atomically by the producer.
      41             :         slots [record.SyncConcurrency]atomic.Pointer[Batch]
      42             : }
      43             : 
      44             : const dequeueBits = 32
      45             : 
      46           1 : func (q *commitQueue) unpack(ptrs uint64) (head, tail uint32) {
      47           1 :         const mask = 1<<dequeueBits - 1
      48           1 :         head = uint32((ptrs >> dequeueBits) & mask)
      49           1 :         tail = uint32(ptrs & mask)
      50           1 :         return
      51           1 : }
      52             : 
      53           1 : func (q *commitQueue) pack(head, tail uint32) uint64 {
      54           1 :         const mask = 1<<dequeueBits - 1
      55           1 :         return (uint64(head) << dequeueBits) |
      56           1 :                 uint64(tail&mask)
      57           1 : }
      58             : 
      59           1 : func (q *commitQueue) enqueue(b *Batch) {
      60           1 :         ptrs := q.headTail.Load()
      61           1 :         head, tail := q.unpack(ptrs)
      62           1 :         if (tail+uint32(len(q.slots)))&(1<<dequeueBits-1) == head {
      63           0 :                 // Queue is full. This should never be reached because commitPipeline.commitQueueSem
      64           0 :                 // limits the number of concurrent operations.
      65           0 :                 panic("pebble: not reached")
      66             :         }
      67           1 :         slot := &q.slots[head&uint32(len(q.slots)-1)]
      68           1 : 
      69           1 :         // Check if the head slot has been released by dequeueApplied.
      70           1 :         for slot.Load() != nil {
      71           0 :                 // Another goroutine is still cleaning up the tail, so the queue is
      72           0 :                 // actually still full. We spin because this should resolve itself
      73           0 :                 // momentarily.
      74           0 :                 runtime.Gosched()
      75           0 :         }
      76             : 
      77             :         // The head slot is free, so we own it.
      78           1 :         slot.Store(b)
      79           1 : 
      80           1 :         // Increment head. This passes ownership of slot to dequeueApplied and acts as a
      81           1 :         // store barrier for writing the slot.
      82           1 :         q.headTail.Add(1 << dequeueBits)
      83             : }
      84             : 
      85             : // dequeueApplied removes the earliest enqueued Batch, if it is applied.
      86             : //
      87             : // Returns nil if the commit queue is empty or the earliest Batch is not yet
      88             : // applied.
      89           1 : func (q *commitQueue) dequeueApplied() *Batch {
      90           1 :         for {
      91           1 :                 ptrs := q.headTail.Load()
      92           1 :                 head, tail := q.unpack(ptrs)
      93           1 :                 if tail == head {
      94           1 :                         // Queue is empty.
      95           1 :                         return nil
      96           1 :                 }
      97             : 
      98           1 :                 slot := &q.slots[tail&uint32(len(q.slots)-1)]
      99           1 :                 b := slot.Load()
     100           1 :                 if b == nil || !b.applied.Load() {
     101           0 :                         // The batch is not ready to be dequeued, or another goroutine has
     102           0 :                         // already dequeued it.
     103           0 :                         return nil
     104           0 :                 }
     105             : 
     106             :                 // Confirm head and tail (for our speculative check above) and increment
     107             :                 // tail. If this succeeds, then we own the slot at tail.
     108           1 :                 ptrs2 := q.pack(head, tail+1)
     109           1 :                 if q.headTail.CompareAndSwap(ptrs, ptrs2) {
     110           1 :                         // We now own slot.
     111           1 :                         //
     112           1 :                         // Tell enqueue that we're done with this slot. Zeroing the slot is also
     113           1 :                         // important so we don't leave behind references that could keep this object
     114           1 :                         // live longer than necessary.
     115           1 :                         slot.Store(nil)
     116           1 :                         // At this point enqueue owns the slot.
     117           1 :                         return b
     118           1 :                 }
     119             :         }
     120             : }
     121             : 
     122             : // commitEnv contains the environment that a commitPipeline interacts
     123             : // with. This allows fine-grained testing of commitPipeline behavior without
     124             : // construction of an entire DB.
     125             : type commitEnv struct {
     126             :         // The next sequence number to give to a batch. Protected by
     127             :         // commitPipeline.mu.
     128             :         logSeqNum *atomic.Uint64
     129             :         // The visible sequence number at which reads should be performed. Ratcheted
     130             :         // upwards atomically as batches are applied to the memtable.
     131             :         visibleSeqNum *atomic.Uint64
     132             : 
     133             :         // Apply the batch to the specified memtable. Called concurrently.
     134             :         apply func(b *Batch, mem *memTable) error
     135             :         // Write the batch to the WAL. If wg != nil, the data will be persisted
     136             :         // asynchronously and done will be called on wg upon completion. If wg != nil
     137             :         // and err != nil, a failure to persist the WAL will populate *err. Returns
     138             :         // the memtable the batch should be applied to. Serial execution enforced by
     139             :         // commitPipeline.mu.
     140             :         write func(b *Batch, wg *sync.WaitGroup, err *error) (*memTable, error)
     141             : }
     142             : 
     143             : // A commitPipeline manages the stages of committing a set of mutations
     144             : // (contained in a single Batch) atomically to the DB. The steps are
     145             : // conceptually:
     146             : //
     147             : //  1. Write the batch to the WAL and optionally sync the WAL
     148             : //  2. Apply the mutations in the batch to the memtable
     149             : //
     150             : // These two simple steps are made complicated by the desire for high
     151             : // performance. In the absence of concurrency, performance is limited by how
     152             : // fast a batch can be written (and synced) to the WAL and then added to the
     153             : // memtable, both of which are outside the purview of the commit
     154             : // pipeline. Performance under concurrency is the primary concern of the commit
     155             : // pipeline, though it also needs to maintain two invariants:
     156             : //
     157             : //  1. Batches need to be written to the WAL in sequence number order.
     158             : //  2. Batches need to be made visible for reads in sequence number order. This
     159             : //     invariant arises from the use of a single sequence number which
     160             : //     indicates which mutations are visible.
     161             : //
     162             : // Taking these invariants into account, let's revisit the work the commit
     163             : // pipeline needs to perform. Writing the batch to the WAL is necessarily
     164             : // serialized as there is a single WAL object. The order of the entries in the
     165             : // WAL defines the sequence number order. Note that writing to the WAL is
     166             : // extremely fast, usually just a memory copy. Applying the mutations in a
     167             : // batch to the memtable can occur concurrently as the underlying skiplist
     168             : // supports concurrent insertions. Publishing the visible sequence number is
     169             : // another serialization point, but one with a twist: the visible sequence
     170             : // number cannot be bumped until the mutations for earlier batches have
     171             : // finished applying to the memtable (the visible sequence number only ratchets
     172             : // up). Lastly, if requested, the commit waits for the WAL to sync. Note that
     173             : // waiting for the WAL sync after ratcheting the visible sequence number allows
     174             : // another goroutine to read committed data before the WAL has synced. This is
     175             : // similar behavior to RocksDB's manual WAL flush functionality. Application
     176             : // code needs to protect against this if necessary.
     177             : //
     178             : // The full outline of the commit pipeline operation is as follows:
     179             : //
     180             : //      with commitPipeline mutex locked:
     181             : //        assign batch sequence number
     182             : //        write batch to WAL
     183             : //      (optionally) add batch to WAL sync list
     184             : //      apply batch to memtable (concurrently)
     185             : //      wait for earlier batches to apply
     186             : //      ratchet read sequence number
     187             : //      (optionally) wait for the WAL to sync
     188             : //
     189             : // As soon as a batch has been written to the WAL, the commitPipeline mutex is
     190             : // released allowing another batch to write to the WAL. Each commit operation
     191             : // individually applies its batch to the memtable providing concurrency. The
     192             : // WAL sync happens concurrently with applying to the memtable (see
     193             : // commitPipeline.syncLoop).
     194             : //
     195             : // The "waits for earlier batches to apply" work is more complicated than might
     196             : // be expected. The obvious approach would be to keep a queue of pending
     197             : // batches and for each batch to wait for the previous batch to finish
     198             : // committing. This approach was tried initially and turned out to be too
     199             : // slow. The problem is that it causes excessive goroutine activity as each
     200             : // committing goroutine needs to wake up in order for the next goroutine to be
     201             : // unblocked. The approach taken in the current code is conceptually similar,
     202             : // though it avoids waking a goroutine to perform work that another goroutine
     203             : // can perform. A commitQueue (a single-producer, multiple-consumer queue)
     204             : // holds the ordered list of committing batches. Addition to the queue is done
     205             : // while holding commitPipeline.mutex ensuring the same ordering of batches in
     206             : // the queue as the ordering in the WAL. When a batch finishes applying to the
     207             : // memtable, it atomically updates its Batch.applied field. Ratcheting of the
     208             : // visible sequence number is done by commitPipeline.publish which loops
     209             : // dequeueing "applied" batches and ratcheting the visible sequence number. If
     210             : // we hit an unapplied batch at the head of the queue we can block as we know
     211             : // that committing of that unapplied batch will eventually find our (applied)
     212             : // batch in the queue. See commitPipeline.publish for additional commentary.
     213             : type commitPipeline struct {
     214             :         // WARNING: The following struct `commitQueue` contains fields which will
     215             :         // be accessed atomically.
     216             :         //
     217             :         // Go allocations are guaranteed to be 64-bit aligned which we take advantage
     218             :         // of by placing the 64-bit fields which we access atomically at the beginning
     219             :         // of the commitPipeline struct.
     220             :         // For more information, see https://golang.org/pkg/sync/atomic/#pkg-note-BUG.
     221             :         // Queue of pending batches to commit.
     222             :         pending commitQueue
     223             :         env     commitEnv
     224             :         // The commit path has two queues:
     225             :         // - commitPipeline.pending contains batches whose seqnums have not yet been
     226             :         //   published. It is a lock-free single producer multi consumer queue.
     227             :         // - LogWriter.flusher.syncQ contains state for batches that have asked for
     228             :         //   a sync. It is a lock-free single producer single consumer queue.
     229             :         // These lock-free queues have a fixed capacity. And since they are
     230             :         // lock-free, we cannot do blocking waits when pushing onto these queues, in
     231             :         // case they are full. Additionally, adding to these queues happens while
     232             :         // holding commitPipeline.mu, and we don't want to block while holding that
     233             :         // mutex since it is also needed by other code.
     234             :         //
     235             :         // Popping from these queues is independent and for a particular batch can
     236             :         // occur in either order, though it is more common that popping from the
     237             :         // commitPipeline.pending will happen first.
     238             :         //
     239             :         // Due to these constraints, we reserve a unit of space in each queue before
     240             :         // acquiring commitPipeline.mu, which also ensures that the push operation
     241             :         // is guaranteed to have space in the queue. The commitQueueSem and
     242             :         // logSyncQSem are used for this reservation.
     243             :         commitQueueSem chan struct{}
     244             :         logSyncQSem    chan struct{}
     245             :         ingestSem      chan struct{}
     246             :         // The mutex to use for synchronizing access to logSeqNum and serializing
     247             :         // calls to commitEnv.write().
     248             :         mu sync.Mutex
     249             : }
     250             : 
     251           1 : func newCommitPipeline(env commitEnv) *commitPipeline {
     252           1 :         p := &commitPipeline{
     253           1 :                 env: env,
     254           1 :                 // The capacity of both commitQueue.slots and syncQueue.slots is set to
     255           1 :                 // record.SyncConcurrency, which also determines the value of these
     256           1 :                 // semaphores. We used to have a single semaphore, which required that the
     257           1 :                 // capacity of these queues be the same. Now that we have two semaphores,
     258           1 :                 // the capacity of these queues could be changed to be different. Say half
     259           1 :                 // of the batches asked to be synced, but syncing took 5x the latency of
     260           1 :                 // adding to the memtable and publishing. Then syncQueue.slots could be
     261           1 :                 // sized as 0.5*5 of the commitQueue.slots. We can explore this if we find
     262           1 :                 // that LogWriterMetrics.SyncQueueLen has high utilization under some
     263           1 :                 // workloads.
     264           1 :                 //
     265           1 :                 // NB: the commit concurrency is one less than SyncConcurrency because we
     266           1 :                 // have to allow one "slot" for a concurrent WAL rotation which will close
     267           1 :                 // and sync the WAL.
     268           1 :                 commitQueueSem: make(chan struct{}, record.SyncConcurrency-1),
     269           1 :                 logSyncQSem:    make(chan struct{}, record.SyncConcurrency-1),
     270           1 :                 ingestSem:      make(chan struct{}, 1),
     271           1 :         }
     272           1 :         return p
     273           1 : }
     274             : 
     275             : // directWrite is used to directly write to the WAL. commitPipeline.mu must be
     276             : // held while this is called. DB.mu must not be held. directWrite will only
     277             : // return once the WAL sync is complete. Note that DirectWrite is a special case
     278             : // function which is currently only used when ingesting sstables as a flushable.
     279             : // Reason carefully about the correctness argument when calling this function
     280             : // from any context.
     281           1 : func (p *commitPipeline) directWrite(b *Batch) error {
     282           1 :         var syncWG sync.WaitGroup
     283           1 :         var syncErr error
     284           1 :         syncWG.Add(1)
     285           1 :         p.logSyncQSem <- struct{}{}
     286           1 :         _, err := p.env.write(b, &syncWG, &syncErr)
     287           1 :         syncWG.Wait()
     288           1 :         err = firstError(err, syncErr)
     289           1 :         return err
     290           1 : }
     291             : 
     292             : // Commit the specified batch, writing it to the WAL, optionally syncing the
     293             : // WAL, and applying the batch to the memtable. Upon successful return the
     294             : // batch's mutations will be visible for reading.
     295             : // REQUIRES: noSyncWait => syncWAL
     296           1 : func (p *commitPipeline) Commit(b *Batch, syncWAL bool, noSyncWait bool) error {
     297           1 :         if b.Empty() {
     298           1 :                 return nil
     299           1 :         }
     300             : 
     301           1 :         commitStartTime := time.Now()
     302           1 :         // Acquire semaphores.
     303           1 :         p.commitQueueSem <- struct{}{}
     304           1 :         if syncWAL {
     305           1 :                 p.logSyncQSem <- struct{}{}
     306           1 :         }
     307           1 :         b.commitStats.SemaphoreWaitDuration = time.Since(commitStartTime)
     308           1 : 
     309           1 :         // Prepare the batch for committing: enqueuing the batch in the pending
     310           1 :         // queue, determining the batch sequence number and writing the data to the
     311           1 :         // WAL.
     312           1 :         //
     313           1 :         // NB: We set Batch.commitErr on error so that the batch won't be a candidate
     314           1 :         // for reuse. See Batch.release().
     315           1 :         mem, err := p.prepare(b, syncWAL, noSyncWait)
     316           1 :         if err != nil {
     317           0 :                 b.db = nil // prevent batch reuse on error
     318           0 :                 // NB: we are not doing <-p.commitQueueSem since the batch is still
     319           0 :                 // sitting in the pending queue. We should consider fixing this by also
     320           0 :                 // removing the batch from the pending queue.
     321           0 :                 return err
     322           0 :         }
     323             : 
     324             :         // Apply the batch to the memtable.
     325           1 :         if err := p.env.apply(b, mem); err != nil {
     326           0 :                 b.db = nil // prevent batch reuse on error
     327           0 :                 // NB: we are not doing <-p.commitQueueSem since the batch is still
     328           0 :                 // sitting in the pending queue. We should consider fixing this by also
     329           0 :                 // removing the batch from the pending queue.
     330           0 :                 return err
     331           0 :         }
     332             : 
     333             :         // Publish the batch sequence number.
     334           1 :         p.publish(b)
     335           1 : 
     336           1 :         <-p.commitQueueSem
     337           1 : 
     338           1 :         if !noSyncWait {
     339           1 :                 // Already waited for commit, so look at the error.
     340           1 :                 if b.commitErr != nil {
     341           0 :                         b.db = nil // prevent batch reuse on error
     342           0 :                         err = b.commitErr
     343           0 :                 }
     344             :         }
     345             :         // Else noSyncWait. The LogWriter can be concurrently writing to
     346             :         // b.commitErr. We will read b.commitErr in Batch.SyncWait after the
     347             :         // LogWriter is done writing.
     348             : 
     349           1 :         b.commitStats.TotalDuration = time.Since(commitStartTime)
     350           1 : 
     351           1 :         return err
     352             : }
     353             : 
     354             : // AllocateSeqNum allocates count sequence numbers, invokes the prepare
     355             : // callback, then the apply callback, and then publishes the sequence
     356             : // numbers. AllocateSeqNum does not write to the WAL or add entries to the
     357             : // memtable. AllocateSeqNum can be used to sequence an operation such as
     358             : // sstable ingestion within the commit pipeline. The prepare callback is
     359             : // invoked with commitPipeline.mu held, but note that DB.mu is not held and
     360             : // must be locked if necessary.
     361             : func (p *commitPipeline) AllocateSeqNum(
     362             :         count int, prepare func(seqNum uint64), apply func(seqNum uint64),
     363           1 : ) {
     364           1 :         // This method is similar to Commit and prepare. Be careful about trying to
     365           1 :         // share additional code with those methods because Commit and prepare are
     366           1 :         // performance critical code paths.
     367           1 : 
     368           1 :         b := newBatch(nil)
     369           1 :         defer b.release()
     370           1 : 
     371           1 :         // Give the batch a count of 1 so that the log and visible sequence number
     372           1 :         // are incremented correctly.
     373           1 :         b.data = make([]byte, batchHeaderLen)
     374           1 :         b.setCount(uint32(count))
     375           1 :         b.commit.Add(1)
     376           1 : 
     377           1 :         p.commitQueueSem <- struct{}{}
     378           1 : 
     379           1 :         p.mu.Lock()
     380           1 : 
     381           1 :         // Enqueue the batch in the pending queue. Note that while the pending queue
     382           1 :         // is lock-free, we want the order of batches to be the same as the sequence
     383           1 :         // number order.
     384           1 :         p.pending.enqueue(b)
     385           1 : 
     386           1 :         // Assign the batch a sequence number. Note that we use atomic operations
     387           1 :         // here to handle concurrent reads of logSeqNum. commitPipeline.mu provides
     388           1 :         // mutual exclusion for other goroutines writing to logSeqNum.
     389           1 :         logSeqNum := p.env.logSeqNum.Add(uint64(count)) - uint64(count)
     390           1 :         seqNum := logSeqNum
     391           1 :         if seqNum == 0 {
     392           0 :                 // We can't use the value 0 for the global seqnum during ingestion, because
     393           0 :                 // 0 indicates no global seqnum. So allocate one more seqnum.
     394           0 :                 p.env.logSeqNum.Add(1)
     395           0 :                 seqNum++
     396           0 :         }
     397           1 :         b.setSeqNum(seqNum)
     398           1 : 
     399           1 :         // Wait for any outstanding writes to the memtable to complete. This is
     400           1 :         // necessary for ingestion so that the check for memtable overlap can see any
     401           1 :         // writes that were sequenced before the ingestion. The spin loop is
     402           1 :         // unfortunate, but obviates the need for additional synchronization.
     403           1 :         for {
     404           1 :                 visibleSeqNum := p.env.visibleSeqNum.Load()
     405           1 :                 if visibleSeqNum == logSeqNum {
     406           1 :                         break
     407             :                 }
     408           0 :                 runtime.Gosched()
     409             :         }
     410             : 
     411             :         // Invoke the prepare callback. Note the lack of error reporting. Even if the
     412             :         // callback internally fails, the sequence number needs to be published in
     413             :         // order to allow the commit pipeline to proceed.
     414           1 :         prepare(b.SeqNum())
     415           1 : 
     416           1 :         p.mu.Unlock()
     417           1 : 
     418           1 :         // Invoke the apply callback.
     419           1 :         apply(b.SeqNum())
     420           1 : 
     421           1 :         // Publish the sequence number.
     422           1 :         p.publish(b)
     423           1 : 
     424           1 :         <-p.commitQueueSem
     425             : }
     426             : 
     427           1 : func (p *commitPipeline) prepare(b *Batch, syncWAL bool, noSyncWait bool) (*memTable, error) {
     428           1 :         n := uint64(b.Count())
     429           1 :         if n == invalidBatchCount {
     430           0 :                 return nil, ErrInvalidBatch
     431           0 :         }
     432           1 :         var syncWG *sync.WaitGroup
     433           1 :         var syncErr *error
     434           1 :         switch {
     435           1 :         case !syncWAL:
     436           1 :                 // Only need to wait for the publish.
     437           1 :                 b.commit.Add(1)
     438             :         // Remaining cases represent syncWAL=true.
     439           1 :         case noSyncWait:
     440           1 :                 syncErr = &b.commitErr
     441           1 :                 syncWG = &b.fsyncWait
     442           1 :                 // Only need to wait synchronously for the publish. The user will
     443           1 :                 // (asynchronously) wait on the batch's fsyncWait.
     444           1 :                 b.commit.Add(1)
     445           1 :                 b.fsyncWait.Add(1)
     446           1 :         case !noSyncWait:
     447           1 :                 syncErr = &b.commitErr
     448           1 :                 syncWG = &b.commit
     449           1 :                 // Must wait for both the publish and the WAL fsync.
     450           1 :                 b.commit.Add(2)
     451             :         }
     452             : 
     453           1 :         p.mu.Lock()
     454           1 : 
     455           1 :         // Enqueue the batch in the pending queue. Note that while the pending queue
     456           1 :         // is lock-free, we want the order of batches to be the same as the sequence
     457           1 :         // number order.
     458           1 :         p.pending.enqueue(b)
     459           1 : 
     460           1 :         // Assign the batch a sequence number. Note that we use atomic operations
     461           1 :         // here to handle concurrent reads of logSeqNum. commitPipeline.mu provides
     462           1 :         // mutual exclusion for other goroutines writing to logSeqNum.
     463           1 :         b.setSeqNum(p.env.logSeqNum.Add(n) - n)
     464           1 : 
     465           1 :         // Write the data to the WAL.
     466           1 :         mem, err := p.env.write(b, syncWG, syncErr)
     467           1 : 
     468           1 :         p.mu.Unlock()
     469           1 : 
     470           1 :         return mem, err
     471             : }
     472             : 
     473           1 : func (p *commitPipeline) publish(b *Batch) {
     474           1 :         // Mark the batch as applied.
     475           1 :         b.applied.Store(true)
     476           1 : 
     477           1 :         // Loop dequeuing applied batches from the pending queue. If our batch was
     478           1 :         // the head of the pending queue we are guaranteed that either we'll publish
     479           1 :         // it or someone else will dequeueApplied and publish it. If our batch is not the
     480           1 :         // head of the queue then either we'll dequeueApplied applied batches and reach our
     481           1 :         // batch or there is an unapplied batch blocking us. When that unapplied
     482           1 :         // batch applies it will go through the same process and publish our batch
     483           1 :         // for us.
     484           1 :         for {
     485           1 :                 t := p.pending.dequeueApplied()
     486           1 :                 if t == nil {
     487           1 :                         // Wait for another goroutine to publish us. We might also be waiting for
     488           1 :                         // the WAL sync to finish.
     489           1 :                         now := time.Now()
     490           1 :                         b.commit.Wait()
     491           1 :                         b.commitStats.CommitWaitDuration += time.Since(now)
     492           1 :                         break
     493             :                 }
     494           1 :                 if !t.applied.Load() {
     495           0 :                         panic("not reached")
     496             :                 }
     497             : 
     498             :                 // We're responsible for publishing the sequence number for batch t, but
     499             :                 // another concurrent goroutine might sneak in and publish the sequence
     500             :                 // number for a subsequent batch. That's ok as all we're guaranteeing is
     501             :                 // that the sequence number ratchets up.
     502           1 :                 for {
     503           1 :                         curSeqNum := p.env.visibleSeqNum.Load()
     504           1 :                         newSeqNum := t.SeqNum() + uint64(t.Count())
     505           1 :                         if newSeqNum <= curSeqNum {
     506           0 :                                 // t's sequence number has already been published.
     507           0 :                                 break
     508             :                         }
     509           1 :                         if p.env.visibleSeqNum.CompareAndSwap(curSeqNum, newSeqNum) {
     510           1 :                                 // We successfully published t's sequence number.
     511           1 :                                 break
     512             :                         }
     513             :                 }
     514             : 
     515           1 :                 t.commit.Done()
     516             :         }
     517             : }

Generated by: LCOV version 1.14